4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 cfs_spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 cfs_spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 cfs_spin_unlock(&obd_types_lock);
112 cfs_spin_unlock(&obd_types_lock);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
122 const char *modname = name;
123 if (!cfs_request_module("%s", modname)) {
124 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125 type = class_search_type(name);
127 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133 cfs_spin_lock(&type->obd_type_lock);
135 cfs_try_module_get(type->typ_dt_ops->o_owner);
136 cfs_spin_unlock(&type->obd_type_lock);
140 EXPORT_SYMBOL(class_get_type);
142 void class_put_type(struct obd_type *type)
145 cfs_spin_lock(&type->obd_type_lock);
147 cfs_module_put(type->typ_dt_ops->o_owner);
148 cfs_spin_unlock(&type->obd_type_lock);
150 EXPORT_SYMBOL(class_put_type);
152 #define CLASS_MAX_NAME 1024
154 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
155 struct lprocfs_vars *vars, const char *name,
156 struct lu_device_type *ldt)
158 struct obd_type *type;
163 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
165 if (class_search_type(name)) {
166 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
171 OBD_ALLOC(type, sizeof(*type));
175 OBD_ALLOC_PTR(type->typ_dt_ops);
176 OBD_ALLOC_PTR(type->typ_md_ops);
177 OBD_ALLOC(type->typ_name, strlen(name) + 1);
179 if (type->typ_dt_ops == NULL ||
180 type->typ_md_ops == NULL ||
181 type->typ_name == NULL)
184 *(type->typ_dt_ops) = *dt_ops;
185 /* md_ops is optional */
187 *(type->typ_md_ops) = *md_ops;
188 strcpy(type->typ_name, name);
189 cfs_spin_lock_init(&type->obd_type_lock);
192 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
194 if (IS_ERR(type->typ_procroot)) {
195 rc = PTR_ERR(type->typ_procroot);
196 type->typ_procroot = NULL;
202 rc = lu_device_type_init(ldt);
207 cfs_spin_lock(&obd_types_lock);
208 cfs_list_add(&type->typ_chain, &obd_types);
209 cfs_spin_unlock(&obd_types_lock);
214 if (type->typ_name != NULL)
215 OBD_FREE(type->typ_name, strlen(name) + 1);
216 if (type->typ_md_ops != NULL)
217 OBD_FREE_PTR(type->typ_md_ops);
218 if (type->typ_dt_ops != NULL)
219 OBD_FREE_PTR(type->typ_dt_ops);
220 OBD_FREE(type, sizeof(*type));
223 EXPORT_SYMBOL(class_register_type);
225 int class_unregister_type(const char *name)
227 struct obd_type *type = class_search_type(name);
231 CERROR("unknown obd type\n");
235 if (type->typ_refcnt) {
236 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
237 /* This is a bad situation, let's make the best of it */
238 /* Remove ops, but leave the name for debugging */
239 OBD_FREE_PTR(type->typ_dt_ops);
240 OBD_FREE_PTR(type->typ_md_ops);
244 if (type->typ_procroot) {
245 lprocfs_remove(&type->typ_procroot);
249 lu_device_type_fini(type->typ_lu);
251 cfs_spin_lock(&obd_types_lock);
252 cfs_list_del(&type->typ_chain);
253 cfs_spin_unlock(&obd_types_lock);
254 OBD_FREE(type->typ_name, strlen(name) + 1);
255 if (type->typ_dt_ops != NULL)
256 OBD_FREE_PTR(type->typ_dt_ops);
257 if (type->typ_md_ops != NULL)
258 OBD_FREE_PTR(type->typ_md_ops);
259 OBD_FREE(type, sizeof(*type));
261 } /* class_unregister_type */
262 EXPORT_SYMBOL(class_unregister_type);
265 * Create a new obd device.
267 * Find an empty slot in ::obd_devs[], create a new obd device in it.
269 * \param[in] type_name obd device type string.
270 * \param[in] name obd device name.
272 * \retval NULL if create fails, otherwise return the obd device
275 struct obd_device *class_newdev(const char *type_name, const char *name)
277 struct obd_device *result = NULL;
278 struct obd_device *newdev;
279 struct obd_type *type = NULL;
281 int new_obd_minor = 0;
284 if (strlen(name) >= MAX_OBD_NAME) {
285 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
286 RETURN(ERR_PTR(-EINVAL));
289 type = class_get_type(type_name);
291 CERROR("OBD: unknown type: %s\n", type_name);
292 RETURN(ERR_PTR(-ENODEV));
295 newdev = obd_device_alloc();
296 if (newdev == NULL) {
297 class_put_type(type);
298 RETURN(ERR_PTR(-ENOMEM));
300 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302 cfs_write_lock(&obd_dev_lock);
303 for (i = 0; i < class_devno_max(); i++) {
304 struct obd_device *obd = class_num2obd(i);
306 if (obd && obd->obd_name &&
307 (strcmp(name, obd->obd_name) == 0)) {
308 CERROR("Device %s already exists at %d, won't add\n",
311 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312 "%p obd_magic %08x != %08x\n", result,
313 result->obd_magic, OBD_DEVICE_MAGIC);
314 LASSERTF(result->obd_minor == new_obd_minor,
315 "%p obd_minor %d != %d\n", result,
316 result->obd_minor, new_obd_minor);
318 obd_devs[result->obd_minor] = NULL;
319 result->obd_name[0]='\0';
321 result = ERR_PTR(-EEXIST);
324 if (!result && !obd) {
326 result->obd_minor = i;
328 result->obd_type = type;
329 strncpy(result->obd_name, name,
330 sizeof(result->obd_name) - 1);
331 obd_devs[i] = result;
334 cfs_write_unlock(&obd_dev_lock);
336 if (result == NULL && i >= class_devno_max()) {
337 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339 RETURN(ERR_PTR(-EOVERFLOW));
342 if (IS_ERR(result)) {
343 obd_device_free(newdev);
344 class_put_type(type);
346 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347 result->obd_name, result);
352 void class_release_dev(struct obd_device *obd)
354 struct obd_type *obd_type = obd->obd_type;
356 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
357 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
358 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
359 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
360 LASSERT(obd_type != NULL);
362 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
363 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
365 cfs_write_lock(&obd_dev_lock);
366 obd_devs[obd->obd_minor] = NULL;
367 cfs_write_unlock(&obd_dev_lock);
368 obd_device_free(obd);
370 class_put_type(obd_type);
373 int class_name2dev(const char *name)
380 cfs_read_lock(&obd_dev_lock);
381 for (i = 0; i < class_devno_max(); i++) {
382 struct obd_device *obd = class_num2obd(i);
384 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
385 /* Make sure we finished attaching before we give
386 out any references */
387 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
388 if (obd->obd_attached) {
389 cfs_read_unlock(&obd_dev_lock);
395 cfs_read_unlock(&obd_dev_lock);
399 EXPORT_SYMBOL(class_name2dev);
401 struct obd_device *class_name2obd(const char *name)
403 int dev = class_name2dev(name);
405 if (dev < 0 || dev > class_devno_max())
407 return class_num2obd(dev);
409 EXPORT_SYMBOL(class_name2obd);
411 int class_uuid2dev(struct obd_uuid *uuid)
415 cfs_read_lock(&obd_dev_lock);
416 for (i = 0; i < class_devno_max(); i++) {
417 struct obd_device *obd = class_num2obd(i);
419 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
420 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
421 cfs_read_unlock(&obd_dev_lock);
425 cfs_read_unlock(&obd_dev_lock);
429 EXPORT_SYMBOL(class_uuid2dev);
431 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
433 int dev = class_uuid2dev(uuid);
436 return class_num2obd(dev);
438 EXPORT_SYMBOL(class_uuid2obd);
441 * Get obd device from ::obd_devs[]
443 * \param num [in] array index
445 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
446 * otherwise return the obd device there.
448 struct obd_device *class_num2obd(int num)
450 struct obd_device *obd = NULL;
452 if (num < class_devno_max()) {
457 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
458 "%p obd_magic %08x != %08x\n",
459 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
460 LASSERTF(obd->obd_minor == num,
461 "%p obd_minor %0d != %0d\n",
462 obd, obd->obd_minor, num);
467 EXPORT_SYMBOL(class_num2obd);
469 void class_obd_list(void)
474 cfs_read_lock(&obd_dev_lock);
475 for (i = 0; i < class_devno_max(); i++) {
476 struct obd_device *obd = class_num2obd(i);
480 if (obd->obd_stopping)
482 else if (obd->obd_set_up)
484 else if (obd->obd_attached)
488 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
489 i, status, obd->obd_type->typ_name,
490 obd->obd_name, obd->obd_uuid.uuid,
491 cfs_atomic_read(&obd->obd_refcount));
493 cfs_read_unlock(&obd_dev_lock);
497 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
498 specified, then only the client with that uuid is returned,
499 otherwise any client connected to the tgt is returned. */
500 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
501 const char * typ_name,
502 struct obd_uuid *grp_uuid)
506 cfs_read_lock(&obd_dev_lock);
507 for (i = 0; i < class_devno_max(); i++) {
508 struct obd_device *obd = class_num2obd(i);
512 if ((strncmp(obd->obd_type->typ_name, typ_name,
513 strlen(typ_name)) == 0)) {
514 if (obd_uuid_equals(tgt_uuid,
515 &obd->u.cli.cl_target_uuid) &&
516 ((grp_uuid)? obd_uuid_equals(grp_uuid,
517 &obd->obd_uuid) : 1)) {
518 cfs_read_unlock(&obd_dev_lock);
523 cfs_read_unlock(&obd_dev_lock);
527 EXPORT_SYMBOL(class_find_client_obd);
529 /* Iterate the obd_device list looking devices have grp_uuid. Start
530 searching at *next, and if a device is found, the next index to look
531 at is saved in *next. If next is NULL, then the first matching device
532 will always be returned. */
533 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
539 else if (*next >= 0 && *next < class_devno_max())
544 cfs_read_lock(&obd_dev_lock);
545 for (; i < class_devno_max(); i++) {
546 struct obd_device *obd = class_num2obd(i);
550 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
553 cfs_read_unlock(&obd_dev_lock);
557 cfs_read_unlock(&obd_dev_lock);
561 EXPORT_SYMBOL(class_devices_in_group);
564 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
565 * adjust sptlrpc settings accordingly.
567 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
569 struct obd_device *obd;
573 LASSERT(namelen > 0);
575 cfs_read_lock(&obd_dev_lock);
576 for (i = 0; i < class_devno_max(); i++) {
577 obd = class_num2obd(i);
579 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
582 /* only notify mdc, osc, mdt, ost */
583 type = obd->obd_type->typ_name;
584 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
585 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
586 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
587 strcmp(type, LUSTRE_OST_NAME) != 0)
590 if (strncmp(obd->obd_name, fsname, namelen))
593 class_incref(obd, __FUNCTION__, obd);
594 cfs_read_unlock(&obd_dev_lock);
595 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
596 sizeof(KEY_SPTLRPC_CONF),
597 KEY_SPTLRPC_CONF, 0, NULL, NULL);
599 class_decref(obd, __FUNCTION__, obd);
600 cfs_read_lock(&obd_dev_lock);
602 cfs_read_unlock(&obd_dev_lock);
605 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
607 void obd_cleanup_caches(void)
612 if (obd_device_cachep) {
613 rc = cfs_mem_cache_destroy(obd_device_cachep);
614 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
615 obd_device_cachep = NULL;
618 rc = cfs_mem_cache_destroy(obdo_cachep);
619 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
623 rc = cfs_mem_cache_destroy(import_cachep);
624 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
625 import_cachep = NULL;
628 rc = cfs_mem_cache_destroy(capa_cachep);
629 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
635 int obd_init_caches(void)
639 LASSERT(obd_device_cachep == NULL);
640 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
641 sizeof(struct obd_device),
643 if (!obd_device_cachep)
646 LASSERT(obdo_cachep == NULL);
647 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
652 LASSERT(import_cachep == NULL);
653 import_cachep = cfs_mem_cache_create("ll_import_cache",
654 sizeof(struct obd_import),
659 LASSERT(capa_cachep == NULL);
660 capa_cachep = cfs_mem_cache_create("capa_cache",
661 sizeof(struct obd_capa), 0, 0);
667 obd_cleanup_caches();
672 /* map connection to client */
673 struct obd_export *class_conn2export(struct lustre_handle *conn)
675 struct obd_export *export;
679 CDEBUG(D_CACHE, "looking for null handle\n");
683 if (conn->cookie == -1) { /* this means assign a new connection */
684 CDEBUG(D_CACHE, "want a new connection\n");
688 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
689 export = class_handle2object(conn->cookie);
692 EXPORT_SYMBOL(class_conn2export);
694 struct obd_device *class_exp2obd(struct obd_export *exp)
700 EXPORT_SYMBOL(class_exp2obd);
702 struct obd_device *class_conn2obd(struct lustre_handle *conn)
704 struct obd_export *export;
705 export = class_conn2export(conn);
707 struct obd_device *obd = export->exp_obd;
708 class_export_put(export);
713 EXPORT_SYMBOL(class_conn2obd);
715 struct obd_import *class_exp2cliimp(struct obd_export *exp)
717 struct obd_device *obd = exp->exp_obd;
720 return obd->u.cli.cl_import;
722 EXPORT_SYMBOL(class_exp2cliimp);
724 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
726 struct obd_device *obd = class_conn2obd(conn);
729 return obd->u.cli.cl_import;
731 EXPORT_SYMBOL(class_conn2cliimp);
733 /* Export management functions */
734 static void class_export_destroy(struct obd_export *exp)
736 struct obd_device *obd = exp->exp_obd;
739 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
740 LASSERT(obd != NULL);
742 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
743 exp->exp_client_uuid.uuid, obd->obd_name);
745 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
746 if (exp->exp_connection)
747 ptlrpc_put_connection_superhack(exp->exp_connection);
749 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
750 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
751 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
752 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
753 obd_destroy_export(exp);
754 class_decref(obd, "export", exp);
756 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
760 static void export_handle_addref(void *export)
762 class_export_get(export);
765 static struct portals_handle_ops export_handle_ops = {
766 .hop_addref = export_handle_addref,
770 struct obd_export *class_export_get(struct obd_export *exp)
772 cfs_atomic_inc(&exp->exp_refcount);
773 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
774 cfs_atomic_read(&exp->exp_refcount));
777 EXPORT_SYMBOL(class_export_get);
779 void class_export_put(struct obd_export *exp)
781 LASSERT(exp != NULL);
782 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
783 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
784 cfs_atomic_read(&exp->exp_refcount) - 1);
786 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
787 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
788 CDEBUG(D_IOCTL, "final put %p/%s\n",
789 exp, exp->exp_client_uuid.uuid);
791 /* release nid stat refererence */
792 lprocfs_exp_cleanup(exp);
794 obd_zombie_export_add(exp);
797 EXPORT_SYMBOL(class_export_put);
799 /* Creates a new export, adds it to the hash table, and returns a
800 * pointer to it. The refcount is 2: one for the hash reference, and
801 * one for the pointer returned by this function. */
802 struct obd_export *class_new_export(struct obd_device *obd,
803 struct obd_uuid *cluuid)
805 struct obd_export *export;
806 cfs_hash_t *hash = NULL;
810 OBD_ALLOC_PTR(export);
812 return ERR_PTR(-ENOMEM);
814 export->exp_conn_cnt = 0;
815 export->exp_lock_hash = NULL;
816 export->exp_flock_hash = NULL;
817 cfs_atomic_set(&export->exp_refcount, 2);
818 cfs_atomic_set(&export->exp_rpc_count, 0);
819 cfs_atomic_set(&export->exp_cb_count, 0);
820 cfs_atomic_set(&export->exp_locks_count, 0);
821 #if LUSTRE_TRACKS_LOCK_EXP_REFS
822 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
823 cfs_spin_lock_init(&export->exp_locks_list_guard);
825 cfs_atomic_set(&export->exp_replay_count, 0);
826 export->exp_obd = obd;
827 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
828 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
829 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
830 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
831 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
832 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
833 class_handle_hash(&export->exp_handle, &export_handle_ops);
834 export->exp_last_request_time = cfs_time_current_sec();
835 cfs_spin_lock_init(&export->exp_lock);
836 cfs_spin_lock_init(&export->exp_rpc_lock);
837 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
838 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
839 cfs_spin_lock_init(&export->exp_bl_list_lock);
840 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
842 export->exp_sp_peer = LUSTRE_SP_ANY;
843 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
844 export->exp_client_uuid = *cluuid;
845 obd_init_export(export);
847 cfs_spin_lock(&obd->obd_dev_lock);
848 /* shouldn't happen, but might race */
849 if (obd->obd_stopping)
850 GOTO(exit_unlock, rc = -ENODEV);
852 hash = cfs_hash_getref(obd->obd_uuid_hash);
854 GOTO(exit_unlock, rc = -ENODEV);
855 cfs_spin_unlock(&obd->obd_dev_lock);
857 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
858 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
860 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
861 obd->obd_name, cluuid->uuid, rc);
862 GOTO(exit_err, rc = -EALREADY);
866 cfs_spin_lock(&obd->obd_dev_lock);
867 if (obd->obd_stopping) {
868 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
869 GOTO(exit_unlock, rc = -ENODEV);
872 class_incref(obd, "export", export);
873 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
874 cfs_list_add_tail(&export->exp_obd_chain_timed,
875 &export->exp_obd->obd_exports_timed);
876 export->exp_obd->obd_num_exports++;
877 cfs_spin_unlock(&obd->obd_dev_lock);
878 cfs_hash_putref(hash);
882 cfs_spin_unlock(&obd->obd_dev_lock);
885 cfs_hash_putref(hash);
886 class_handle_unhash(&export->exp_handle);
887 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
888 obd_destroy_export(export);
889 OBD_FREE_PTR(export);
892 EXPORT_SYMBOL(class_new_export);
894 void class_unlink_export(struct obd_export *exp)
896 class_handle_unhash(&exp->exp_handle);
898 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
899 /* delete an uuid-export hashitem from hashtables */
900 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
901 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
902 &exp->exp_client_uuid,
903 &exp->exp_uuid_hash);
905 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
906 cfs_list_del_init(&exp->exp_obd_chain_timed);
907 exp->exp_obd->obd_num_exports--;
908 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
909 class_export_put(exp);
911 EXPORT_SYMBOL(class_unlink_export);
913 /* Import management functions */
914 void class_import_destroy(struct obd_import *imp)
918 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
919 imp->imp_obd->obd_name);
921 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
923 ptlrpc_put_connection_superhack(imp->imp_connection);
925 while (!cfs_list_empty(&imp->imp_conn_list)) {
926 struct obd_import_conn *imp_conn;
928 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
929 struct obd_import_conn, oic_item);
930 cfs_list_del_init(&imp_conn->oic_item);
931 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
932 OBD_FREE(imp_conn, sizeof(*imp_conn));
935 LASSERT(imp->imp_sec == NULL);
936 class_decref(imp->imp_obd, "import", imp);
937 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
941 static void import_handle_addref(void *import)
943 class_import_get(import);
946 static struct portals_handle_ops import_handle_ops = {
947 .hop_addref = import_handle_addref,
951 struct obd_import *class_import_get(struct obd_import *import)
953 cfs_atomic_inc(&import->imp_refcount);
954 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
955 cfs_atomic_read(&import->imp_refcount),
956 import->imp_obd->obd_name);
959 EXPORT_SYMBOL(class_import_get);
961 void class_import_put(struct obd_import *imp)
965 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
966 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
968 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
969 cfs_atomic_read(&imp->imp_refcount) - 1,
970 imp->imp_obd->obd_name);
972 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
973 CDEBUG(D_INFO, "final put import %p\n", imp);
974 obd_zombie_import_add(imp);
979 EXPORT_SYMBOL(class_import_put);
981 static void init_imp_at(struct imp_at *at) {
983 at_init(&at->iat_net_latency, 0, 0);
984 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
985 /* max service estimates are tracked on the server side, so
986 don't use the AT history here, just use the last reported
987 val. (But keep hist for proc histogram, worst_ever) */
988 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
993 struct obd_import *class_new_import(struct obd_device *obd)
995 struct obd_import *imp;
997 OBD_ALLOC(imp, sizeof(*imp));
1001 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1002 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1003 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1004 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1005 cfs_spin_lock_init(&imp->imp_lock);
1006 imp->imp_last_success_conn = 0;
1007 imp->imp_state = LUSTRE_IMP_NEW;
1008 imp->imp_obd = class_incref(obd, "import", imp);
1009 cfs_mutex_init(&imp->imp_sec_mutex);
1010 cfs_waitq_init(&imp->imp_recovery_waitq);
1012 cfs_atomic_set(&imp->imp_refcount, 2);
1013 cfs_atomic_set(&imp->imp_unregistering, 0);
1014 cfs_atomic_set(&imp->imp_inflight, 0);
1015 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1016 cfs_atomic_set(&imp->imp_inval_count, 0);
1017 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1018 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1019 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1020 init_imp_at(&imp->imp_at);
1022 /* the default magic is V2, will be used in connect RPC, and
1023 * then adjusted according to the flags in request/reply. */
1024 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1028 EXPORT_SYMBOL(class_new_import);
1030 void class_destroy_import(struct obd_import *import)
1032 LASSERT(import != NULL);
1033 LASSERT(import != LP_POISON);
1035 class_handle_unhash(&import->imp_handle);
1037 cfs_spin_lock(&import->imp_lock);
1038 import->imp_generation++;
1039 cfs_spin_unlock(&import->imp_lock);
1040 class_import_put(import);
1042 EXPORT_SYMBOL(class_destroy_import);
1044 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1046 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1048 cfs_spin_lock(&exp->exp_locks_list_guard);
1050 LASSERT(lock->l_exp_refs_nr >= 0);
1052 if (lock->l_exp_refs_target != NULL &&
1053 lock->l_exp_refs_target != exp) {
1054 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1055 exp, lock, lock->l_exp_refs_target);
1057 if ((lock->l_exp_refs_nr ++) == 0) {
1058 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1059 lock->l_exp_refs_target = exp;
1061 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1062 lock, exp, lock->l_exp_refs_nr);
1063 cfs_spin_unlock(&exp->exp_locks_list_guard);
1065 EXPORT_SYMBOL(__class_export_add_lock_ref);
1067 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1069 cfs_spin_lock(&exp->exp_locks_list_guard);
1070 LASSERT(lock->l_exp_refs_nr > 0);
1071 if (lock->l_exp_refs_target != exp) {
1072 LCONSOLE_WARN("lock %p, "
1073 "mismatching export pointers: %p, %p\n",
1074 lock, lock->l_exp_refs_target, exp);
1076 if (-- lock->l_exp_refs_nr == 0) {
1077 cfs_list_del_init(&lock->l_exp_refs_link);
1078 lock->l_exp_refs_target = NULL;
1080 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1081 lock, exp, lock->l_exp_refs_nr);
1082 cfs_spin_unlock(&exp->exp_locks_list_guard);
1084 EXPORT_SYMBOL(__class_export_del_lock_ref);
1087 /* A connection defines an export context in which preallocation can
1088 be managed. This releases the export pointer reference, and returns
1089 the export handle, so the export refcount is 1 when this function
1091 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1092 struct obd_uuid *cluuid)
1094 struct obd_export *export;
1095 LASSERT(conn != NULL);
1096 LASSERT(obd != NULL);
1097 LASSERT(cluuid != NULL);
1100 export = class_new_export(obd, cluuid);
1102 RETURN(PTR_ERR(export));
1104 conn->cookie = export->exp_handle.h_cookie;
1105 class_export_put(export);
1107 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1108 cluuid->uuid, conn->cookie);
1111 EXPORT_SYMBOL(class_connect);
1113 /* if export is involved in recovery then clean up related things */
1114 void class_export_recovery_cleanup(struct obd_export *exp)
1116 struct obd_device *obd = exp->exp_obd;
1118 cfs_spin_lock(&obd->obd_recovery_task_lock);
1119 if (exp->exp_delayed)
1120 obd->obd_delayed_clients--;
1121 if (obd->obd_recovering && exp->exp_in_recovery) {
1122 cfs_spin_lock(&exp->exp_lock);
1123 exp->exp_in_recovery = 0;
1124 cfs_spin_unlock(&exp->exp_lock);
1125 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1126 cfs_atomic_dec(&obd->obd_connected_clients);
1128 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1129 /** Cleanup req replay fields */
1130 if (exp->exp_req_replay_needed) {
1131 cfs_spin_lock(&exp->exp_lock);
1132 exp->exp_req_replay_needed = 0;
1133 cfs_spin_unlock(&exp->exp_lock);
1134 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1135 cfs_atomic_dec(&obd->obd_req_replay_clients);
1137 /** Cleanup lock replay data */
1138 if (exp->exp_lock_replay_needed) {
1139 cfs_spin_lock(&exp->exp_lock);
1140 exp->exp_lock_replay_needed = 0;
1141 cfs_spin_unlock(&exp->exp_lock);
1142 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1143 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1147 /* This function removes 1-3 references from the export:
1148 * 1 - for export pointer passed
1149 * and if disconnect really need
1150 * 2 - removing from hash
1151 * 3 - in client_unlink_export
1152 * The export pointer passed to this function can destroyed */
1153 int class_disconnect(struct obd_export *export)
1155 int already_disconnected;
1158 if (export == NULL) {
1159 CWARN("attempting to free NULL export %p\n", export);
1163 cfs_spin_lock(&export->exp_lock);
1164 already_disconnected = export->exp_disconnected;
1165 export->exp_disconnected = 1;
1166 cfs_spin_unlock(&export->exp_lock);
1168 /* class_cleanup(), abort_recovery(), and class_fail_export()
1169 * all end up in here, and if any of them race we shouldn't
1170 * call extra class_export_puts(). */
1171 if (already_disconnected) {
1172 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1173 GOTO(no_disconn, already_disconnected);
1176 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1177 export->exp_handle.h_cookie);
1179 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1180 cfs_hash_del(export->exp_obd->obd_nid_hash,
1181 &export->exp_connection->c_peer.nid,
1182 &export->exp_nid_hash);
1184 class_export_recovery_cleanup(export);
1185 class_unlink_export(export);
1187 class_export_put(export);
1190 EXPORT_SYMBOL(class_disconnect);
1192 /* Return non-zero for a fully connected export */
1193 int class_connected_export(struct obd_export *exp)
1197 cfs_spin_lock(&exp->exp_lock);
1198 connected = (exp->exp_conn_cnt > 0);
1199 cfs_spin_unlock(&exp->exp_lock);
1204 EXPORT_SYMBOL(class_connected_export);
1206 static void class_disconnect_export_list(cfs_list_t *list,
1207 enum obd_option flags)
1210 struct obd_export *exp;
1213 /* It's possible that an export may disconnect itself, but
1214 * nothing else will be added to this list. */
1215 while (!cfs_list_empty(list)) {
1216 exp = cfs_list_entry(list->next, struct obd_export,
1218 /* need for safe call CDEBUG after obd_disconnect */
1219 class_export_get(exp);
1221 cfs_spin_lock(&exp->exp_lock);
1222 exp->exp_flags = flags;
1223 cfs_spin_unlock(&exp->exp_lock);
1225 if (obd_uuid_equals(&exp->exp_client_uuid,
1226 &exp->exp_obd->obd_uuid)) {
1228 "exp %p export uuid == obd uuid, don't discon\n",
1230 /* Need to delete this now so we don't end up pointing
1231 * to work_list later when this export is cleaned up. */
1232 cfs_list_del_init(&exp->exp_obd_chain);
1233 class_export_put(exp);
1237 class_export_get(exp);
1238 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1239 "last request at "CFS_TIME_T"\n",
1240 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1241 exp, exp->exp_last_request_time);
1242 /* release one export reference anyway */
1243 rc = obd_disconnect(exp);
1245 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1246 obd_export_nid2str(exp), exp, rc);
1247 class_export_put(exp);
1252 void class_disconnect_exports(struct obd_device *obd)
1254 cfs_list_t work_list;
1257 /* Move all of the exports from obd_exports to a work list, en masse. */
1258 CFS_INIT_LIST_HEAD(&work_list);
1259 cfs_spin_lock(&obd->obd_dev_lock);
1260 cfs_list_splice_init(&obd->obd_exports, &work_list);
1261 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1262 cfs_spin_unlock(&obd->obd_dev_lock);
1264 if (!cfs_list_empty(&work_list)) {
1265 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1266 "disconnecting them\n", obd->obd_minor, obd);
1267 class_disconnect_export_list(&work_list,
1268 exp_flags_from_obd(obd));
1270 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1271 obd->obd_minor, obd);
1274 EXPORT_SYMBOL(class_disconnect_exports);
1276 /* Remove exports that have not completed recovery.
1278 void class_disconnect_stale_exports(struct obd_device *obd,
1279 int (*test_export)(struct obd_export *))
1281 cfs_list_t work_list;
1282 struct obd_export *exp, *n;
1286 CFS_INIT_LIST_HEAD(&work_list);
1287 cfs_spin_lock(&obd->obd_dev_lock);
1288 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1290 /* don't count self-export as client */
1291 if (obd_uuid_equals(&exp->exp_client_uuid,
1292 &exp->exp_obd->obd_uuid))
1295 cfs_spin_lock(&exp->exp_lock);
1296 if (test_export(exp)) {
1297 cfs_spin_unlock(&exp->exp_lock);
1300 exp->exp_failed = 1;
1301 cfs_spin_unlock(&exp->exp_lock);
1303 cfs_list_move(&exp->exp_obd_chain, &work_list);
1305 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1306 obd->obd_name, exp->exp_client_uuid.uuid,
1307 exp->exp_connection == NULL ? "<unknown>" :
1308 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1309 print_export_data(exp, "EVICTING", 0);
1311 cfs_spin_unlock(&obd->obd_dev_lock);
1314 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1315 obd->obd_name, evicted);
1316 obd->obd_stale_clients += evicted;
1318 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1319 OBD_OPT_ABORT_RECOV);
1322 EXPORT_SYMBOL(class_disconnect_stale_exports);
1324 void class_fail_export(struct obd_export *exp)
1326 int rc, already_failed;
1328 cfs_spin_lock(&exp->exp_lock);
1329 already_failed = exp->exp_failed;
1330 exp->exp_failed = 1;
1331 cfs_spin_unlock(&exp->exp_lock);
1333 if (already_failed) {
1334 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1335 exp, exp->exp_client_uuid.uuid);
1339 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1340 exp, exp->exp_client_uuid.uuid);
1342 if (obd_dump_on_timeout)
1343 libcfs_debug_dumplog();
1345 /* need for safe call CDEBUG after obd_disconnect */
1346 class_export_get(exp);
1348 /* Most callers into obd_disconnect are removing their own reference
1349 * (request, for example) in addition to the one from the hash table.
1350 * We don't have such a reference here, so make one. */
1351 class_export_get(exp);
1352 rc = obd_disconnect(exp);
1354 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1356 CDEBUG(D_HA, "disconnected export %p/%s\n",
1357 exp, exp->exp_client_uuid.uuid);
1358 class_export_put(exp);
1360 EXPORT_SYMBOL(class_fail_export);
1362 char *obd_export_nid2str(struct obd_export *exp)
1364 if (exp->exp_connection != NULL)
1365 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1369 EXPORT_SYMBOL(obd_export_nid2str);
1371 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1373 struct obd_export *doomed_exp = NULL;
1374 int exports_evicted = 0;
1376 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1379 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1380 if (doomed_exp == NULL)
1383 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1384 "nid %s found, wanted nid %s, requested nid %s\n",
1385 obd_export_nid2str(doomed_exp),
1386 libcfs_nid2str(nid_key), nid);
1387 LASSERTF(doomed_exp != obd->obd_self_export,
1388 "self-export is hashed by NID?\n");
1390 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1391 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1393 class_fail_export(doomed_exp);
1394 class_export_put(doomed_exp);
1397 if (!exports_evicted)
1398 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1399 obd->obd_name, nid);
1400 return exports_evicted;
1402 EXPORT_SYMBOL(obd_export_evict_by_nid);
1404 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1406 struct obd_export *doomed_exp = NULL;
1407 struct obd_uuid doomed_uuid;
1408 int exports_evicted = 0;
1410 obd_str2uuid(&doomed_uuid, uuid);
1411 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1412 CERROR("%s: can't evict myself\n", obd->obd_name);
1413 return exports_evicted;
1416 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1418 if (doomed_exp == NULL) {
1419 CERROR("%s: can't disconnect %s: no exports found\n",
1420 obd->obd_name, uuid);
1422 CWARN("%s: evicting %s at adminstrative request\n",
1423 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1424 class_fail_export(doomed_exp);
1425 class_export_put(doomed_exp);
1429 return exports_evicted;
1431 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1433 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1434 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1435 EXPORT_SYMBOL(class_export_dump_hook);
1438 static void print_export_data(struct obd_export *exp, const char *status,
1441 struct ptlrpc_reply_state *rs;
1442 struct ptlrpc_reply_state *first_reply = NULL;
1445 cfs_spin_lock(&exp->exp_lock);
1446 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1452 cfs_spin_unlock(&exp->exp_lock);
1454 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1455 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1456 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1457 cfs_atomic_read(&exp->exp_rpc_count),
1458 cfs_atomic_read(&exp->exp_cb_count),
1459 cfs_atomic_read(&exp->exp_locks_count),
1460 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1461 nreplies, first_reply, nreplies > 3 ? "..." : "",
1462 exp->exp_last_committed);
1463 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1464 if (locks && class_export_dump_hook != NULL)
1465 class_export_dump_hook(exp);
1469 void dump_exports(struct obd_device *obd, int locks)
1471 struct obd_export *exp;
1473 cfs_spin_lock(&obd->obd_dev_lock);
1474 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1475 print_export_data(exp, "ACTIVE", locks);
1476 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1477 print_export_data(exp, "UNLINKED", locks);
1478 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1479 print_export_data(exp, "DELAYED", locks);
1480 cfs_spin_unlock(&obd->obd_dev_lock);
1481 cfs_spin_lock(&obd_zombie_impexp_lock);
1482 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1483 print_export_data(exp, "ZOMBIE", locks);
1484 cfs_spin_unlock(&obd_zombie_impexp_lock);
1486 EXPORT_SYMBOL(dump_exports);
1488 void obd_exports_barrier(struct obd_device *obd)
1491 LASSERT(cfs_list_empty(&obd->obd_exports));
1492 cfs_spin_lock(&obd->obd_dev_lock);
1493 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1494 cfs_spin_unlock(&obd->obd_dev_lock);
1495 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1496 cfs_time_seconds(waited));
1497 if (waited > 5 && IS_PO2(waited)) {
1498 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1499 "more than %d seconds. "
1500 "The obd refcount = %d. Is it stuck?\n",
1501 obd->obd_name, waited,
1502 cfs_atomic_read(&obd->obd_refcount));
1503 dump_exports(obd, 1);
1506 cfs_spin_lock(&obd->obd_dev_lock);
1508 cfs_spin_unlock(&obd->obd_dev_lock);
1510 EXPORT_SYMBOL(obd_exports_barrier);
1512 /* Total amount of zombies to be destroyed */
1513 static int zombies_count = 0;
1516 * kill zombie imports and exports
1518 void obd_zombie_impexp_cull(void)
1520 struct obd_import *import;
1521 struct obd_export *export;
1525 cfs_spin_lock(&obd_zombie_impexp_lock);
1528 if (!cfs_list_empty(&obd_zombie_imports)) {
1529 import = cfs_list_entry(obd_zombie_imports.next,
1532 cfs_list_del_init(&import->imp_zombie_chain);
1536 if (!cfs_list_empty(&obd_zombie_exports)) {
1537 export = cfs_list_entry(obd_zombie_exports.next,
1540 cfs_list_del_init(&export->exp_obd_chain);
1543 cfs_spin_unlock(&obd_zombie_impexp_lock);
1545 if (import != NULL) {
1546 class_import_destroy(import);
1547 cfs_spin_lock(&obd_zombie_impexp_lock);
1549 cfs_spin_unlock(&obd_zombie_impexp_lock);
1552 if (export != NULL) {
1553 class_export_destroy(export);
1554 cfs_spin_lock(&obd_zombie_impexp_lock);
1556 cfs_spin_unlock(&obd_zombie_impexp_lock);
1560 } while (import != NULL || export != NULL);
1564 static cfs_completion_t obd_zombie_start;
1565 static cfs_completion_t obd_zombie_stop;
1566 static unsigned long obd_zombie_flags;
1567 static cfs_waitq_t obd_zombie_waitq;
1568 static pid_t obd_zombie_pid;
1571 OBD_ZOMBIE_STOP = 1 << 1
1575 * check for work for kill zombie import/export thread.
1577 static int obd_zombie_impexp_check(void *arg)
1581 cfs_spin_lock(&obd_zombie_impexp_lock);
1582 rc = (zombies_count == 0) &&
1583 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1584 cfs_spin_unlock(&obd_zombie_impexp_lock);
1590 * Add export to the obd_zombe thread and notify it.
1592 static void obd_zombie_export_add(struct obd_export *exp) {
1593 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1594 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1595 cfs_list_del_init(&exp->exp_obd_chain);
1596 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1597 cfs_spin_lock(&obd_zombie_impexp_lock);
1599 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1600 cfs_spin_unlock(&obd_zombie_impexp_lock);
1602 obd_zombie_impexp_notify();
1606 * Add import to the obd_zombe thread and notify it.
1608 static void obd_zombie_import_add(struct obd_import *imp) {
1609 LASSERT(imp->imp_sec == NULL);
1610 LASSERT(imp->imp_rq_pool == NULL);
1611 cfs_spin_lock(&obd_zombie_impexp_lock);
1612 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1614 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1615 cfs_spin_unlock(&obd_zombie_impexp_lock);
1617 obd_zombie_impexp_notify();
1621 * notify import/export destroy thread about new zombie.
1623 static void obd_zombie_impexp_notify(void)
1626 * Make sure obd_zomebie_impexp_thread get this notification.
1627 * It is possible this signal only get by obd_zombie_barrier, and
1628 * barrier gulps this notification and sleeps away and hangs ensues
1630 cfs_waitq_broadcast(&obd_zombie_waitq);
1634 * check whether obd_zombie is idle
1636 static int obd_zombie_is_idle(void)
1640 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1641 cfs_spin_lock(&obd_zombie_impexp_lock);
1642 rc = (zombies_count == 0);
1643 cfs_spin_unlock(&obd_zombie_impexp_lock);
1648 * wait when obd_zombie import/export queues become empty
1650 void obd_zombie_barrier(void)
1652 struct l_wait_info lwi = { 0 };
1654 if (obd_zombie_pid == cfs_curproc_pid())
1655 /* don't wait for myself */
1657 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1659 EXPORT_SYMBOL(obd_zombie_barrier);
1664 * destroy zombie export/import thread.
1666 static int obd_zombie_impexp_thread(void *unused)
1670 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1671 cfs_complete(&obd_zombie_start);
1675 cfs_complete(&obd_zombie_start);
1677 obd_zombie_pid = cfs_curproc_pid();
1679 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1680 struct l_wait_info lwi = { 0 };
1682 l_wait_event(obd_zombie_waitq,
1683 !obd_zombie_impexp_check(NULL), &lwi);
1684 obd_zombie_impexp_cull();
1687 * Notify obd_zombie_barrier callers that queues
1690 cfs_waitq_signal(&obd_zombie_waitq);
1693 cfs_complete(&obd_zombie_stop);
1698 #else /* ! KERNEL */
1700 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1701 static void *obd_zombie_impexp_work_cb;
1702 static void *obd_zombie_impexp_idle_cb;
1704 int obd_zombie_impexp_kill(void *arg)
1708 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1709 obd_zombie_impexp_cull();
1712 cfs_atomic_dec(&zombie_recur);
1719 * start destroy zombie import/export thread
1721 int obd_zombie_impexp_init(void)
1725 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1726 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1727 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1728 cfs_init_completion(&obd_zombie_start);
1729 cfs_init_completion(&obd_zombie_stop);
1730 cfs_waitq_init(&obd_zombie_waitq);
1734 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1738 cfs_wait_for_completion(&obd_zombie_start);
1741 obd_zombie_impexp_work_cb =
1742 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1743 &obd_zombie_impexp_kill, NULL);
1745 obd_zombie_impexp_idle_cb =
1746 liblustre_register_idle_callback("obd_zombi_impexp_check",
1747 &obd_zombie_impexp_check, NULL);
1753 * stop destroy zombie import/export thread
1755 void obd_zombie_impexp_stop(void)
1757 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1758 obd_zombie_impexp_notify();
1760 cfs_wait_for_completion(&obd_zombie_stop);
1762 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1763 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1767 /***** Kernel-userspace comm helpers *******/
1769 /* Get length of entire message, including header */
1770 int kuc_len(int payload_len)
1772 return sizeof(struct kuc_hdr) + payload_len;
1774 EXPORT_SYMBOL(kuc_len);
1776 /* Get a pointer to kuc header, given a ptr to the payload
1777 * @param p Pointer to payload area
1778 * @returns Pointer to kuc header
1780 struct kuc_hdr * kuc_ptr(void *p)
1782 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1783 LASSERT(lh->kuc_magic == KUC_MAGIC);
1786 EXPORT_SYMBOL(kuc_ptr);
1788 /* Test if payload is part of kuc message
1789 * @param p Pointer to payload area
1792 int kuc_ispayload(void *p)
1794 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1796 if (kh->kuc_magic == KUC_MAGIC)
1801 EXPORT_SYMBOL(kuc_ispayload);
1803 /* Alloc space for a message, and fill in header
1804 * @return Pointer to payload area
1806 void *kuc_alloc(int payload_len, int transport, int type)
1809 int len = kuc_len(payload_len);
1813 return ERR_PTR(-ENOMEM);
1815 lh->kuc_magic = KUC_MAGIC;
1816 lh->kuc_transport = transport;
1817 lh->kuc_msgtype = type;
1818 lh->kuc_msglen = len;
1820 return (void *)(lh + 1);
1822 EXPORT_SYMBOL(kuc_alloc);
1824 /* Takes pointer to payload area */
1825 inline void kuc_free(void *p, int payload_len)
1827 struct kuc_hdr *lh = kuc_ptr(p);
1828 OBD_FREE(lh, kuc_len(payload_len));
1830 EXPORT_SYMBOL(kuc_free);