4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 cfs_spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 cfs_spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 cfs_spin_unlock(&obd_types_lock);
112 cfs_spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (!cfs_request_module("%s", modname)) {
129 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
130 type = class_search_type(name);
132 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
138 cfs_spin_lock(&type->obd_type_lock);
140 cfs_try_module_get(type->typ_dt_ops->o_owner);
141 cfs_spin_unlock(&type->obd_type_lock);
145 EXPORT_SYMBOL(class_get_type);
147 void class_put_type(struct obd_type *type)
150 cfs_spin_lock(&type->obd_type_lock);
152 cfs_module_put(type->typ_dt_ops->o_owner);
153 cfs_spin_unlock(&type->obd_type_lock);
155 EXPORT_SYMBOL(class_put_type);
157 #define CLASS_MAX_NAME 1024
159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160 struct lprocfs_vars *vars, const char *name,
161 struct lu_device_type *ldt)
163 struct obd_type *type;
168 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
170 if (class_search_type(name)) {
171 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
176 OBD_ALLOC(type, sizeof(*type));
180 OBD_ALLOC_PTR(type->typ_dt_ops);
181 OBD_ALLOC_PTR(type->typ_md_ops);
182 OBD_ALLOC(type->typ_name, strlen(name) + 1);
184 if (type->typ_dt_ops == NULL ||
185 type->typ_md_ops == NULL ||
186 type->typ_name == NULL)
189 *(type->typ_dt_ops) = *dt_ops;
190 /* md_ops is optional */
192 *(type->typ_md_ops) = *md_ops;
193 strcpy(type->typ_name, name);
194 cfs_spin_lock_init(&type->obd_type_lock);
197 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
199 if (IS_ERR(type->typ_procroot)) {
200 rc = PTR_ERR(type->typ_procroot);
201 type->typ_procroot = NULL;
207 rc = lu_device_type_init(ldt);
212 cfs_spin_lock(&obd_types_lock);
213 cfs_list_add(&type->typ_chain, &obd_types);
214 cfs_spin_unlock(&obd_types_lock);
219 if (type->typ_name != NULL)
220 OBD_FREE(type->typ_name, strlen(name) + 1);
221 if (type->typ_md_ops != NULL)
222 OBD_FREE_PTR(type->typ_md_ops);
223 if (type->typ_dt_ops != NULL)
224 OBD_FREE_PTR(type->typ_dt_ops);
225 OBD_FREE(type, sizeof(*type));
228 EXPORT_SYMBOL(class_register_type);
230 int class_unregister_type(const char *name)
232 struct obd_type *type = class_search_type(name);
236 CERROR("unknown obd type\n");
240 if (type->typ_refcnt) {
241 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
242 /* This is a bad situation, let's make the best of it */
243 /* Remove ops, but leave the name for debugging */
244 OBD_FREE_PTR(type->typ_dt_ops);
245 OBD_FREE_PTR(type->typ_md_ops);
249 /* we do not use type->typ_procroot as for compatibility purposes
250 * other modules can share names (i.e. lod can use lov entry). so
251 * we can't reference pointer as it can get invalided when another
252 * module removes the entry */
253 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
256 lu_device_type_fini(type->typ_lu);
258 cfs_spin_lock(&obd_types_lock);
259 cfs_list_del(&type->typ_chain);
260 cfs_spin_unlock(&obd_types_lock);
261 OBD_FREE(type->typ_name, strlen(name) + 1);
262 if (type->typ_dt_ops != NULL)
263 OBD_FREE_PTR(type->typ_dt_ops);
264 if (type->typ_md_ops != NULL)
265 OBD_FREE_PTR(type->typ_md_ops);
266 OBD_FREE(type, sizeof(*type));
268 } /* class_unregister_type */
269 EXPORT_SYMBOL(class_unregister_type);
272 * Create a new obd device.
274 * Find an empty slot in ::obd_devs[], create a new obd device in it.
276 * \param[in] type_name obd device type string.
277 * \param[in] name obd device name.
279 * \retval NULL if create fails, otherwise return the obd device
282 struct obd_device *class_newdev(const char *type_name, const char *name)
284 struct obd_device *result = NULL;
285 struct obd_device *newdev;
286 struct obd_type *type = NULL;
288 int new_obd_minor = 0;
291 if (strlen(name) >= MAX_OBD_NAME) {
292 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
293 RETURN(ERR_PTR(-EINVAL));
296 type = class_get_type(type_name);
298 CERROR("OBD: unknown type: %s\n", type_name);
299 RETURN(ERR_PTR(-ENODEV));
302 newdev = obd_device_alloc();
304 GOTO(out_type, result = ERR_PTR(-ENOMEM));
306 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
308 cfs_write_lock(&obd_dev_lock);
309 for (i = 0; i < class_devno_max(); i++) {
310 struct obd_device *obd = class_num2obd(i);
312 if (obd && obd->obd_name &&
313 (strcmp(name, obd->obd_name) == 0)) {
314 CERROR("Device %s already exists at %d, won't add\n",
317 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
318 "%p obd_magic %08x != %08x\n", result,
319 result->obd_magic, OBD_DEVICE_MAGIC);
320 LASSERTF(result->obd_minor == new_obd_minor,
321 "%p obd_minor %d != %d\n", result,
322 result->obd_minor, new_obd_minor);
324 obd_devs[result->obd_minor] = NULL;
325 result->obd_name[0]='\0';
327 result = ERR_PTR(-EEXIST);
330 if (!result && !obd) {
332 result->obd_minor = i;
334 result->obd_type = type;
335 strncpy(result->obd_name, name,
336 sizeof(result->obd_name) - 1);
337 obd_devs[i] = result;
340 cfs_write_unlock(&obd_dev_lock);
342 if (result == NULL && i >= class_devno_max()) {
343 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
345 GOTO(out, result = ERR_PTR(-EOVERFLOW));
351 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
352 result->obd_name, result);
356 obd_device_free(newdev);
358 class_put_type(type);
362 void class_release_dev(struct obd_device *obd)
364 struct obd_type *obd_type = obd->obd_type;
366 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
367 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
368 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
369 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
370 LASSERT(obd_type != NULL);
372 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
373 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
375 cfs_write_lock(&obd_dev_lock);
376 obd_devs[obd->obd_minor] = NULL;
377 cfs_write_unlock(&obd_dev_lock);
378 obd_device_free(obd);
380 class_put_type(obd_type);
383 int class_name2dev(const char *name)
390 cfs_read_lock(&obd_dev_lock);
391 for (i = 0; i < class_devno_max(); i++) {
392 struct obd_device *obd = class_num2obd(i);
394 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
395 /* Make sure we finished attaching before we give
396 out any references */
397 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
398 if (obd->obd_attached) {
399 cfs_read_unlock(&obd_dev_lock);
405 cfs_read_unlock(&obd_dev_lock);
409 EXPORT_SYMBOL(class_name2dev);
411 struct obd_device *class_name2obd(const char *name)
413 int dev = class_name2dev(name);
415 if (dev < 0 || dev > class_devno_max())
417 return class_num2obd(dev);
419 EXPORT_SYMBOL(class_name2obd);
421 int class_uuid2dev(struct obd_uuid *uuid)
425 cfs_read_lock(&obd_dev_lock);
426 for (i = 0; i < class_devno_max(); i++) {
427 struct obd_device *obd = class_num2obd(i);
429 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
430 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
431 cfs_read_unlock(&obd_dev_lock);
435 cfs_read_unlock(&obd_dev_lock);
439 EXPORT_SYMBOL(class_uuid2dev);
441 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
443 int dev = class_uuid2dev(uuid);
446 return class_num2obd(dev);
448 EXPORT_SYMBOL(class_uuid2obd);
451 * Get obd device from ::obd_devs[]
453 * \param num [in] array index
455 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
456 * otherwise return the obd device there.
458 struct obd_device *class_num2obd(int num)
460 struct obd_device *obd = NULL;
462 if (num < class_devno_max()) {
467 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
468 "%p obd_magic %08x != %08x\n",
469 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
470 LASSERTF(obd->obd_minor == num,
471 "%p obd_minor %0d != %0d\n",
472 obd, obd->obd_minor, num);
477 EXPORT_SYMBOL(class_num2obd);
479 void class_obd_list(void)
484 cfs_read_lock(&obd_dev_lock);
485 for (i = 0; i < class_devno_max(); i++) {
486 struct obd_device *obd = class_num2obd(i);
490 if (obd->obd_stopping)
492 else if (obd->obd_set_up)
494 else if (obd->obd_attached)
498 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
499 i, status, obd->obd_type->typ_name,
500 obd->obd_name, obd->obd_uuid.uuid,
501 cfs_atomic_read(&obd->obd_refcount));
503 cfs_read_unlock(&obd_dev_lock);
507 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
508 specified, then only the client with that uuid is returned,
509 otherwise any client connected to the tgt is returned. */
510 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
511 const char * typ_name,
512 struct obd_uuid *grp_uuid)
516 cfs_read_lock(&obd_dev_lock);
517 for (i = 0; i < class_devno_max(); i++) {
518 struct obd_device *obd = class_num2obd(i);
522 if ((strncmp(obd->obd_type->typ_name, typ_name,
523 strlen(typ_name)) == 0)) {
524 if (obd_uuid_equals(tgt_uuid,
525 &obd->u.cli.cl_target_uuid) &&
526 ((grp_uuid)? obd_uuid_equals(grp_uuid,
527 &obd->obd_uuid) : 1)) {
528 cfs_read_unlock(&obd_dev_lock);
533 cfs_read_unlock(&obd_dev_lock);
537 EXPORT_SYMBOL(class_find_client_obd);
539 /* Iterate the obd_device list looking devices have grp_uuid. Start
540 searching at *next, and if a device is found, the next index to look
541 at is saved in *next. If next is NULL, then the first matching device
542 will always be returned. */
543 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
549 else if (*next >= 0 && *next < class_devno_max())
554 cfs_read_lock(&obd_dev_lock);
555 for (; i < class_devno_max(); i++) {
556 struct obd_device *obd = class_num2obd(i);
560 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
563 cfs_read_unlock(&obd_dev_lock);
567 cfs_read_unlock(&obd_dev_lock);
571 EXPORT_SYMBOL(class_devices_in_group);
574 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
575 * adjust sptlrpc settings accordingly.
577 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
579 struct obd_device *obd;
583 LASSERT(namelen > 0);
585 cfs_read_lock(&obd_dev_lock);
586 for (i = 0; i < class_devno_max(); i++) {
587 obd = class_num2obd(i);
589 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
592 /* only notify mdc, osc, mdt, ost */
593 type = obd->obd_type->typ_name;
594 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
595 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
596 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
597 strcmp(type, LUSTRE_OST_NAME) != 0)
600 if (strncmp(obd->obd_name, fsname, namelen))
603 class_incref(obd, __FUNCTION__, obd);
604 cfs_read_unlock(&obd_dev_lock);
605 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
606 sizeof(KEY_SPTLRPC_CONF),
607 KEY_SPTLRPC_CONF, 0, NULL, NULL);
609 class_decref(obd, __FUNCTION__, obd);
610 cfs_read_lock(&obd_dev_lock);
612 cfs_read_unlock(&obd_dev_lock);
615 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
617 void obd_cleanup_caches(void)
622 if (obd_device_cachep) {
623 rc = cfs_mem_cache_destroy(obd_device_cachep);
624 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
625 obd_device_cachep = NULL;
628 rc = cfs_mem_cache_destroy(obdo_cachep);
629 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
633 rc = cfs_mem_cache_destroy(import_cachep);
634 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
635 import_cachep = NULL;
638 rc = cfs_mem_cache_destroy(capa_cachep);
639 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
645 int obd_init_caches(void)
649 LASSERT(obd_device_cachep == NULL);
650 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
651 sizeof(struct obd_device),
653 if (!obd_device_cachep)
656 LASSERT(obdo_cachep == NULL);
657 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
662 LASSERT(import_cachep == NULL);
663 import_cachep = cfs_mem_cache_create("ll_import_cache",
664 sizeof(struct obd_import),
669 LASSERT(capa_cachep == NULL);
670 capa_cachep = cfs_mem_cache_create("capa_cache",
671 sizeof(struct obd_capa), 0, 0);
677 obd_cleanup_caches();
682 /* map connection to client */
683 struct obd_export *class_conn2export(struct lustre_handle *conn)
685 struct obd_export *export;
689 CDEBUG(D_CACHE, "looking for null handle\n");
693 if (conn->cookie == -1) { /* this means assign a new connection */
694 CDEBUG(D_CACHE, "want a new connection\n");
698 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
699 export = class_handle2object(conn->cookie);
702 EXPORT_SYMBOL(class_conn2export);
704 struct obd_device *class_exp2obd(struct obd_export *exp)
710 EXPORT_SYMBOL(class_exp2obd);
712 struct obd_device *class_conn2obd(struct lustre_handle *conn)
714 struct obd_export *export;
715 export = class_conn2export(conn);
717 struct obd_device *obd = export->exp_obd;
718 class_export_put(export);
723 EXPORT_SYMBOL(class_conn2obd);
725 struct obd_import *class_exp2cliimp(struct obd_export *exp)
727 struct obd_device *obd = exp->exp_obd;
730 return obd->u.cli.cl_import;
732 EXPORT_SYMBOL(class_exp2cliimp);
734 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
736 struct obd_device *obd = class_conn2obd(conn);
739 return obd->u.cli.cl_import;
741 EXPORT_SYMBOL(class_conn2cliimp);
743 /* Export management functions */
744 static void class_export_destroy(struct obd_export *exp)
746 struct obd_device *obd = exp->exp_obd;
749 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
750 LASSERT(obd != NULL);
752 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
753 exp->exp_client_uuid.uuid, obd->obd_name);
755 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
756 if (exp->exp_connection)
757 ptlrpc_put_connection_superhack(exp->exp_connection);
759 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
760 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
761 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
762 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
763 obd_destroy_export(exp);
764 class_decref(obd, "export", exp);
766 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
770 static void export_handle_addref(void *export)
772 class_export_get(export);
775 static struct portals_handle_ops export_handle_ops = {
776 .hop_addref = export_handle_addref,
780 struct obd_export *class_export_get(struct obd_export *exp)
782 cfs_atomic_inc(&exp->exp_refcount);
783 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
784 cfs_atomic_read(&exp->exp_refcount));
787 EXPORT_SYMBOL(class_export_get);
789 void class_export_put(struct obd_export *exp)
791 LASSERT(exp != NULL);
792 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
793 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
794 cfs_atomic_read(&exp->exp_refcount) - 1);
796 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
797 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
798 CDEBUG(D_IOCTL, "final put %p/%s\n",
799 exp, exp->exp_client_uuid.uuid);
801 /* release nid stat refererence */
802 lprocfs_exp_cleanup(exp);
804 obd_zombie_export_add(exp);
807 EXPORT_SYMBOL(class_export_put);
809 /* Creates a new export, adds it to the hash table, and returns a
810 * pointer to it. The refcount is 2: one for the hash reference, and
811 * one for the pointer returned by this function. */
812 struct obd_export *class_new_export(struct obd_device *obd,
813 struct obd_uuid *cluuid)
815 struct obd_export *export;
816 cfs_hash_t *hash = NULL;
820 OBD_ALLOC_PTR(export);
822 return ERR_PTR(-ENOMEM);
824 export->exp_conn_cnt = 0;
825 export->exp_lock_hash = NULL;
826 export->exp_flock_hash = NULL;
827 cfs_atomic_set(&export->exp_refcount, 2);
828 cfs_atomic_set(&export->exp_rpc_count, 0);
829 cfs_atomic_set(&export->exp_cb_count, 0);
830 cfs_atomic_set(&export->exp_locks_count, 0);
831 #if LUSTRE_TRACKS_LOCK_EXP_REFS
832 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
833 cfs_spin_lock_init(&export->exp_locks_list_guard);
835 cfs_atomic_set(&export->exp_replay_count, 0);
836 export->exp_obd = obd;
837 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
838 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
839 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
840 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
841 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
842 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
843 class_handle_hash(&export->exp_handle, &export_handle_ops);
844 export->exp_last_request_time = cfs_time_current_sec();
845 cfs_spin_lock_init(&export->exp_lock);
846 cfs_spin_lock_init(&export->exp_rpc_lock);
847 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
848 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
849 cfs_spin_lock_init(&export->exp_bl_list_lock);
850 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
852 export->exp_sp_peer = LUSTRE_SP_ANY;
853 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
854 export->exp_client_uuid = *cluuid;
855 obd_init_export(export);
857 cfs_spin_lock(&obd->obd_dev_lock);
858 /* shouldn't happen, but might race */
859 if (obd->obd_stopping)
860 GOTO(exit_unlock, rc = -ENODEV);
862 hash = cfs_hash_getref(obd->obd_uuid_hash);
864 GOTO(exit_unlock, rc = -ENODEV);
865 cfs_spin_unlock(&obd->obd_dev_lock);
867 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
868 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
870 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
871 obd->obd_name, cluuid->uuid, rc);
872 GOTO(exit_err, rc = -EALREADY);
876 cfs_spin_lock(&obd->obd_dev_lock);
877 if (obd->obd_stopping) {
878 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
879 GOTO(exit_unlock, rc = -ENODEV);
882 class_incref(obd, "export", export);
883 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
884 cfs_list_add_tail(&export->exp_obd_chain_timed,
885 &export->exp_obd->obd_exports_timed);
886 export->exp_obd->obd_num_exports++;
887 cfs_spin_unlock(&obd->obd_dev_lock);
888 cfs_hash_putref(hash);
892 cfs_spin_unlock(&obd->obd_dev_lock);
895 cfs_hash_putref(hash);
896 class_handle_unhash(&export->exp_handle);
897 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
898 obd_destroy_export(export);
899 OBD_FREE_PTR(export);
902 EXPORT_SYMBOL(class_new_export);
904 void class_unlink_export(struct obd_export *exp)
906 class_handle_unhash(&exp->exp_handle);
908 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
909 /* delete an uuid-export hashitem from hashtables */
910 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
911 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
912 &exp->exp_client_uuid,
913 &exp->exp_uuid_hash);
915 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
916 cfs_list_del_init(&exp->exp_obd_chain_timed);
917 exp->exp_obd->obd_num_exports--;
918 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
919 class_export_put(exp);
921 EXPORT_SYMBOL(class_unlink_export);
923 /* Import management functions */
924 void class_import_destroy(struct obd_import *imp)
928 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
929 imp->imp_obd->obd_name);
931 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
933 ptlrpc_put_connection_superhack(imp->imp_connection);
935 while (!cfs_list_empty(&imp->imp_conn_list)) {
936 struct obd_import_conn *imp_conn;
938 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
939 struct obd_import_conn, oic_item);
940 cfs_list_del_init(&imp_conn->oic_item);
941 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
942 OBD_FREE(imp_conn, sizeof(*imp_conn));
945 LASSERT(imp->imp_sec == NULL);
946 class_decref(imp->imp_obd, "import", imp);
947 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
951 static void import_handle_addref(void *import)
953 class_import_get(import);
956 static struct portals_handle_ops import_handle_ops = {
957 .hop_addref = import_handle_addref,
961 struct obd_import *class_import_get(struct obd_import *import)
963 cfs_atomic_inc(&import->imp_refcount);
964 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
965 cfs_atomic_read(&import->imp_refcount),
966 import->imp_obd->obd_name);
969 EXPORT_SYMBOL(class_import_get);
971 void class_import_put(struct obd_import *imp)
975 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
976 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
978 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
979 cfs_atomic_read(&imp->imp_refcount) - 1,
980 imp->imp_obd->obd_name);
982 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
983 CDEBUG(D_INFO, "final put import %p\n", imp);
984 obd_zombie_import_add(imp);
987 /* catch possible import put race */
988 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
991 EXPORT_SYMBOL(class_import_put);
993 static void init_imp_at(struct imp_at *at) {
995 at_init(&at->iat_net_latency, 0, 0);
996 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
997 /* max service estimates are tracked on the server side, so
998 don't use the AT history here, just use the last reported
999 val. (But keep hist for proc histogram, worst_ever) */
1000 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1005 struct obd_import *class_new_import(struct obd_device *obd)
1007 struct obd_import *imp;
1009 OBD_ALLOC(imp, sizeof(*imp));
1013 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1014 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1015 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1016 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1017 cfs_spin_lock_init(&imp->imp_lock);
1018 imp->imp_last_success_conn = 0;
1019 imp->imp_state = LUSTRE_IMP_NEW;
1020 imp->imp_obd = class_incref(obd, "import", imp);
1021 cfs_mutex_init(&imp->imp_sec_mutex);
1022 cfs_waitq_init(&imp->imp_recovery_waitq);
1024 cfs_atomic_set(&imp->imp_refcount, 2);
1025 cfs_atomic_set(&imp->imp_unregistering, 0);
1026 cfs_atomic_set(&imp->imp_inflight, 0);
1027 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1028 cfs_atomic_set(&imp->imp_inval_count, 0);
1029 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1030 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1031 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1032 init_imp_at(&imp->imp_at);
1034 /* the default magic is V2, will be used in connect RPC, and
1035 * then adjusted according to the flags in request/reply. */
1036 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1040 EXPORT_SYMBOL(class_new_import);
1042 void class_destroy_import(struct obd_import *import)
1044 LASSERT(import != NULL);
1045 LASSERT(import != LP_POISON);
1047 class_handle_unhash(&import->imp_handle);
1049 cfs_spin_lock(&import->imp_lock);
1050 import->imp_generation++;
1051 cfs_spin_unlock(&import->imp_lock);
1052 class_import_put(import);
1054 EXPORT_SYMBOL(class_destroy_import);
1056 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1058 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1060 cfs_spin_lock(&exp->exp_locks_list_guard);
1062 LASSERT(lock->l_exp_refs_nr >= 0);
1064 if (lock->l_exp_refs_target != NULL &&
1065 lock->l_exp_refs_target != exp) {
1066 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1067 exp, lock, lock->l_exp_refs_target);
1069 if ((lock->l_exp_refs_nr ++) == 0) {
1070 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1071 lock->l_exp_refs_target = exp;
1073 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1074 lock, exp, lock->l_exp_refs_nr);
1075 cfs_spin_unlock(&exp->exp_locks_list_guard);
1077 EXPORT_SYMBOL(__class_export_add_lock_ref);
1079 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1081 cfs_spin_lock(&exp->exp_locks_list_guard);
1082 LASSERT(lock->l_exp_refs_nr > 0);
1083 if (lock->l_exp_refs_target != exp) {
1084 LCONSOLE_WARN("lock %p, "
1085 "mismatching export pointers: %p, %p\n",
1086 lock, lock->l_exp_refs_target, exp);
1088 if (-- lock->l_exp_refs_nr == 0) {
1089 cfs_list_del_init(&lock->l_exp_refs_link);
1090 lock->l_exp_refs_target = NULL;
1092 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1093 lock, exp, lock->l_exp_refs_nr);
1094 cfs_spin_unlock(&exp->exp_locks_list_guard);
1096 EXPORT_SYMBOL(__class_export_del_lock_ref);
1099 /* A connection defines an export context in which preallocation can
1100 be managed. This releases the export pointer reference, and returns
1101 the export handle, so the export refcount is 1 when this function
1103 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1104 struct obd_uuid *cluuid)
1106 struct obd_export *export;
1107 LASSERT(conn != NULL);
1108 LASSERT(obd != NULL);
1109 LASSERT(cluuid != NULL);
1112 export = class_new_export(obd, cluuid);
1114 RETURN(PTR_ERR(export));
1116 conn->cookie = export->exp_handle.h_cookie;
1117 class_export_put(export);
1119 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1120 cluuid->uuid, conn->cookie);
1123 EXPORT_SYMBOL(class_connect);
1125 /* if export is involved in recovery then clean up related things */
1126 void class_export_recovery_cleanup(struct obd_export *exp)
1128 struct obd_device *obd = exp->exp_obd;
1130 cfs_spin_lock(&obd->obd_recovery_task_lock);
1131 if (exp->exp_delayed)
1132 obd->obd_delayed_clients--;
1133 if (obd->obd_recovering && exp->exp_in_recovery) {
1134 cfs_spin_lock(&exp->exp_lock);
1135 exp->exp_in_recovery = 0;
1136 cfs_spin_unlock(&exp->exp_lock);
1137 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1138 cfs_atomic_dec(&obd->obd_connected_clients);
1140 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1141 /** Cleanup req replay fields */
1142 if (exp->exp_req_replay_needed) {
1143 cfs_spin_lock(&exp->exp_lock);
1144 exp->exp_req_replay_needed = 0;
1145 cfs_spin_unlock(&exp->exp_lock);
1146 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1147 cfs_atomic_dec(&obd->obd_req_replay_clients);
1149 /** Cleanup lock replay data */
1150 if (exp->exp_lock_replay_needed) {
1151 cfs_spin_lock(&exp->exp_lock);
1152 exp->exp_lock_replay_needed = 0;
1153 cfs_spin_unlock(&exp->exp_lock);
1154 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1155 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1159 /* This function removes 1-3 references from the export:
1160 * 1 - for export pointer passed
1161 * and if disconnect really need
1162 * 2 - removing from hash
1163 * 3 - in client_unlink_export
1164 * The export pointer passed to this function can destroyed */
1165 int class_disconnect(struct obd_export *export)
1167 int already_disconnected;
1170 if (export == NULL) {
1171 CWARN("attempting to free NULL export %p\n", export);
1175 cfs_spin_lock(&export->exp_lock);
1176 already_disconnected = export->exp_disconnected;
1177 export->exp_disconnected = 1;
1178 cfs_spin_unlock(&export->exp_lock);
1180 /* class_cleanup(), abort_recovery(), and class_fail_export()
1181 * all end up in here, and if any of them race we shouldn't
1182 * call extra class_export_puts(). */
1183 if (already_disconnected) {
1184 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1185 GOTO(no_disconn, already_disconnected);
1188 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1189 export->exp_handle.h_cookie);
1191 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1192 cfs_hash_del(export->exp_obd->obd_nid_hash,
1193 &export->exp_connection->c_peer.nid,
1194 &export->exp_nid_hash);
1196 class_export_recovery_cleanup(export);
1197 class_unlink_export(export);
1199 class_export_put(export);
1202 EXPORT_SYMBOL(class_disconnect);
1204 /* Return non-zero for a fully connected export */
1205 int class_connected_export(struct obd_export *exp)
1209 cfs_spin_lock(&exp->exp_lock);
1210 connected = (exp->exp_conn_cnt > 0);
1211 cfs_spin_unlock(&exp->exp_lock);
1216 EXPORT_SYMBOL(class_connected_export);
1218 static void class_disconnect_export_list(cfs_list_t *list,
1219 enum obd_option flags)
1222 struct obd_export *exp;
1225 /* It's possible that an export may disconnect itself, but
1226 * nothing else will be added to this list. */
1227 while (!cfs_list_empty(list)) {
1228 exp = cfs_list_entry(list->next, struct obd_export,
1230 /* need for safe call CDEBUG after obd_disconnect */
1231 class_export_get(exp);
1233 cfs_spin_lock(&exp->exp_lock);
1234 exp->exp_flags = flags;
1235 cfs_spin_unlock(&exp->exp_lock);
1237 if (obd_uuid_equals(&exp->exp_client_uuid,
1238 &exp->exp_obd->obd_uuid)) {
1240 "exp %p export uuid == obd uuid, don't discon\n",
1242 /* Need to delete this now so we don't end up pointing
1243 * to work_list later when this export is cleaned up. */
1244 cfs_list_del_init(&exp->exp_obd_chain);
1245 class_export_put(exp);
1249 class_export_get(exp);
1250 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1251 "last request at "CFS_TIME_T"\n",
1252 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1253 exp, exp->exp_last_request_time);
1254 /* release one export reference anyway */
1255 rc = obd_disconnect(exp);
1257 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1258 obd_export_nid2str(exp), exp, rc);
1259 class_export_put(exp);
1264 void class_disconnect_exports(struct obd_device *obd)
1266 cfs_list_t work_list;
1269 /* Move all of the exports from obd_exports to a work list, en masse. */
1270 CFS_INIT_LIST_HEAD(&work_list);
1271 cfs_spin_lock(&obd->obd_dev_lock);
1272 cfs_list_splice_init(&obd->obd_exports, &work_list);
1273 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1274 cfs_spin_unlock(&obd->obd_dev_lock);
1276 if (!cfs_list_empty(&work_list)) {
1277 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1278 "disconnecting them\n", obd->obd_minor, obd);
1279 class_disconnect_export_list(&work_list,
1280 exp_flags_from_obd(obd));
1282 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1283 obd->obd_minor, obd);
1286 EXPORT_SYMBOL(class_disconnect_exports);
1288 /* Remove exports that have not completed recovery.
1290 void class_disconnect_stale_exports(struct obd_device *obd,
1291 int (*test_export)(struct obd_export *))
1293 cfs_list_t work_list;
1294 struct obd_export *exp, *n;
1298 CFS_INIT_LIST_HEAD(&work_list);
1299 cfs_spin_lock(&obd->obd_dev_lock);
1300 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1302 /* don't count self-export as client */
1303 if (obd_uuid_equals(&exp->exp_client_uuid,
1304 &exp->exp_obd->obd_uuid))
1307 /* don't evict clients which have no slot in last_rcvd
1308 * (e.g. lightweight connection) */
1309 if (exp->exp_target_data.ted_lr_idx == -1)
1312 cfs_spin_lock(&exp->exp_lock);
1313 if (test_export(exp)) {
1314 cfs_spin_unlock(&exp->exp_lock);
1317 exp->exp_failed = 1;
1318 cfs_spin_unlock(&exp->exp_lock);
1320 cfs_list_move(&exp->exp_obd_chain, &work_list);
1322 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1323 obd->obd_name, exp->exp_client_uuid.uuid,
1324 exp->exp_connection == NULL ? "<unknown>" :
1325 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1326 print_export_data(exp, "EVICTING", 0);
1328 cfs_spin_unlock(&obd->obd_dev_lock);
1331 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1332 obd->obd_name, evicted);
1333 obd->obd_stale_clients += evicted;
1335 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1336 OBD_OPT_ABORT_RECOV);
1339 EXPORT_SYMBOL(class_disconnect_stale_exports);
1341 void class_fail_export(struct obd_export *exp)
1343 int rc, already_failed;
1345 cfs_spin_lock(&exp->exp_lock);
1346 already_failed = exp->exp_failed;
1347 exp->exp_failed = 1;
1348 cfs_spin_unlock(&exp->exp_lock);
1350 if (already_failed) {
1351 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1352 exp, exp->exp_client_uuid.uuid);
1356 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1357 exp, exp->exp_client_uuid.uuid);
1359 if (obd_dump_on_timeout)
1360 libcfs_debug_dumplog();
1362 /* need for safe call CDEBUG after obd_disconnect */
1363 class_export_get(exp);
1365 /* Most callers into obd_disconnect are removing their own reference
1366 * (request, for example) in addition to the one from the hash table.
1367 * We don't have such a reference here, so make one. */
1368 class_export_get(exp);
1369 rc = obd_disconnect(exp);
1371 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1373 CDEBUG(D_HA, "disconnected export %p/%s\n",
1374 exp, exp->exp_client_uuid.uuid);
1375 class_export_put(exp);
1377 EXPORT_SYMBOL(class_fail_export);
1379 char *obd_export_nid2str(struct obd_export *exp)
1381 if (exp->exp_connection != NULL)
1382 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1386 EXPORT_SYMBOL(obd_export_nid2str);
1388 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1390 struct obd_export *doomed_exp = NULL;
1391 int exports_evicted = 0;
1393 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1396 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1397 if (doomed_exp == NULL)
1400 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1401 "nid %s found, wanted nid %s, requested nid %s\n",
1402 obd_export_nid2str(doomed_exp),
1403 libcfs_nid2str(nid_key), nid);
1404 LASSERTF(doomed_exp != obd->obd_self_export,
1405 "self-export is hashed by NID?\n");
1407 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1408 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1410 class_fail_export(doomed_exp);
1411 class_export_put(doomed_exp);
1414 if (!exports_evicted)
1415 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1416 obd->obd_name, nid);
1417 return exports_evicted;
1419 EXPORT_SYMBOL(obd_export_evict_by_nid);
1421 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1423 struct obd_export *doomed_exp = NULL;
1424 struct obd_uuid doomed_uuid;
1425 int exports_evicted = 0;
1427 obd_str2uuid(&doomed_uuid, uuid);
1428 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1429 CERROR("%s: can't evict myself\n", obd->obd_name);
1430 return exports_evicted;
1433 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1435 if (doomed_exp == NULL) {
1436 CERROR("%s: can't disconnect %s: no exports found\n",
1437 obd->obd_name, uuid);
1439 CWARN("%s: evicting %s at adminstrative request\n",
1440 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1441 class_fail_export(doomed_exp);
1442 class_export_put(doomed_exp);
1446 return exports_evicted;
1448 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1450 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1451 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1452 EXPORT_SYMBOL(class_export_dump_hook);
1455 static void print_export_data(struct obd_export *exp, const char *status,
1458 struct ptlrpc_reply_state *rs;
1459 struct ptlrpc_reply_state *first_reply = NULL;
1462 cfs_spin_lock(&exp->exp_lock);
1463 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1469 cfs_spin_unlock(&exp->exp_lock);
1471 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1472 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1473 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1474 cfs_atomic_read(&exp->exp_rpc_count),
1475 cfs_atomic_read(&exp->exp_cb_count),
1476 cfs_atomic_read(&exp->exp_locks_count),
1477 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1478 nreplies, first_reply, nreplies > 3 ? "..." : "",
1479 exp->exp_last_committed);
1480 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1481 if (locks && class_export_dump_hook != NULL)
1482 class_export_dump_hook(exp);
1486 void dump_exports(struct obd_device *obd, int locks)
1488 struct obd_export *exp;
1490 cfs_spin_lock(&obd->obd_dev_lock);
1491 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1492 print_export_data(exp, "ACTIVE", locks);
1493 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1494 print_export_data(exp, "UNLINKED", locks);
1495 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1496 print_export_data(exp, "DELAYED", locks);
1497 cfs_spin_unlock(&obd->obd_dev_lock);
1498 cfs_spin_lock(&obd_zombie_impexp_lock);
1499 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1500 print_export_data(exp, "ZOMBIE", locks);
1501 cfs_spin_unlock(&obd_zombie_impexp_lock);
1503 EXPORT_SYMBOL(dump_exports);
1505 void obd_exports_barrier(struct obd_device *obd)
1508 LASSERT(cfs_list_empty(&obd->obd_exports));
1509 cfs_spin_lock(&obd->obd_dev_lock);
1510 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1511 cfs_spin_unlock(&obd->obd_dev_lock);
1512 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1513 cfs_time_seconds(waited));
1514 if (waited > 5 && IS_PO2(waited)) {
1515 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1516 "more than %d seconds. "
1517 "The obd refcount = %d. Is it stuck?\n",
1518 obd->obd_name, waited,
1519 cfs_atomic_read(&obd->obd_refcount));
1520 dump_exports(obd, 1);
1523 cfs_spin_lock(&obd->obd_dev_lock);
1525 cfs_spin_unlock(&obd->obd_dev_lock);
1527 EXPORT_SYMBOL(obd_exports_barrier);
1529 /* Total amount of zombies to be destroyed */
1530 static int zombies_count = 0;
1533 * kill zombie imports and exports
1535 void obd_zombie_impexp_cull(void)
1537 struct obd_import *import;
1538 struct obd_export *export;
1542 cfs_spin_lock(&obd_zombie_impexp_lock);
1545 if (!cfs_list_empty(&obd_zombie_imports)) {
1546 import = cfs_list_entry(obd_zombie_imports.next,
1549 cfs_list_del_init(&import->imp_zombie_chain);
1553 if (!cfs_list_empty(&obd_zombie_exports)) {
1554 export = cfs_list_entry(obd_zombie_exports.next,
1557 cfs_list_del_init(&export->exp_obd_chain);
1560 cfs_spin_unlock(&obd_zombie_impexp_lock);
1562 if (import != NULL) {
1563 class_import_destroy(import);
1564 cfs_spin_lock(&obd_zombie_impexp_lock);
1566 cfs_spin_unlock(&obd_zombie_impexp_lock);
1569 if (export != NULL) {
1570 class_export_destroy(export);
1571 cfs_spin_lock(&obd_zombie_impexp_lock);
1573 cfs_spin_unlock(&obd_zombie_impexp_lock);
1577 } while (import != NULL || export != NULL);
1581 static cfs_completion_t obd_zombie_start;
1582 static cfs_completion_t obd_zombie_stop;
1583 static unsigned long obd_zombie_flags;
1584 static cfs_waitq_t obd_zombie_waitq;
1585 static pid_t obd_zombie_pid;
1588 OBD_ZOMBIE_STOP = 1 << 1
1592 * check for work for kill zombie import/export thread.
1594 static int obd_zombie_impexp_check(void *arg)
1598 cfs_spin_lock(&obd_zombie_impexp_lock);
1599 rc = (zombies_count == 0) &&
1600 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1601 cfs_spin_unlock(&obd_zombie_impexp_lock);
1607 * Add export to the obd_zombe thread and notify it.
1609 static void obd_zombie_export_add(struct obd_export *exp) {
1610 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1611 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1612 cfs_list_del_init(&exp->exp_obd_chain);
1613 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1614 cfs_spin_lock(&obd_zombie_impexp_lock);
1616 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1617 cfs_spin_unlock(&obd_zombie_impexp_lock);
1619 obd_zombie_impexp_notify();
1623 * Add import to the obd_zombe thread and notify it.
1625 static void obd_zombie_import_add(struct obd_import *imp) {
1626 LASSERT(imp->imp_sec == NULL);
1627 LASSERT(imp->imp_rq_pool == NULL);
1628 cfs_spin_lock(&obd_zombie_impexp_lock);
1629 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1631 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1632 cfs_spin_unlock(&obd_zombie_impexp_lock);
1634 obd_zombie_impexp_notify();
1638 * notify import/export destroy thread about new zombie.
1640 static void obd_zombie_impexp_notify(void)
1643 * Make sure obd_zomebie_impexp_thread get this notification.
1644 * It is possible this signal only get by obd_zombie_barrier, and
1645 * barrier gulps this notification and sleeps away and hangs ensues
1647 cfs_waitq_broadcast(&obd_zombie_waitq);
1651 * check whether obd_zombie is idle
1653 static int obd_zombie_is_idle(void)
1657 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1658 cfs_spin_lock(&obd_zombie_impexp_lock);
1659 rc = (zombies_count == 0);
1660 cfs_spin_unlock(&obd_zombie_impexp_lock);
1665 * wait when obd_zombie import/export queues become empty
1667 void obd_zombie_barrier(void)
1669 struct l_wait_info lwi = { 0 };
1671 if (obd_zombie_pid == cfs_curproc_pid())
1672 /* don't wait for myself */
1674 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1676 EXPORT_SYMBOL(obd_zombie_barrier);
1681 * destroy zombie export/import thread.
1683 static int obd_zombie_impexp_thread(void *unused)
1687 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1688 cfs_complete(&obd_zombie_start);
1692 cfs_complete(&obd_zombie_start);
1694 obd_zombie_pid = cfs_curproc_pid();
1696 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1697 struct l_wait_info lwi = { 0 };
1699 l_wait_event(obd_zombie_waitq,
1700 !obd_zombie_impexp_check(NULL), &lwi);
1701 obd_zombie_impexp_cull();
1704 * Notify obd_zombie_barrier callers that queues
1707 cfs_waitq_signal(&obd_zombie_waitq);
1710 cfs_complete(&obd_zombie_stop);
1715 #else /* ! KERNEL */
1717 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1718 static void *obd_zombie_impexp_work_cb;
1719 static void *obd_zombie_impexp_idle_cb;
1721 int obd_zombie_impexp_kill(void *arg)
1725 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1726 obd_zombie_impexp_cull();
1729 cfs_atomic_dec(&zombie_recur);
1736 * start destroy zombie import/export thread
1738 int obd_zombie_impexp_init(void)
1742 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1743 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1744 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1745 cfs_init_completion(&obd_zombie_start);
1746 cfs_init_completion(&obd_zombie_stop);
1747 cfs_waitq_init(&obd_zombie_waitq);
1751 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1755 cfs_wait_for_completion(&obd_zombie_start);
1758 obd_zombie_impexp_work_cb =
1759 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1760 &obd_zombie_impexp_kill, NULL);
1762 obd_zombie_impexp_idle_cb =
1763 liblustre_register_idle_callback("obd_zombi_impexp_check",
1764 &obd_zombie_impexp_check, NULL);
1770 * stop destroy zombie import/export thread
1772 void obd_zombie_impexp_stop(void)
1774 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1775 obd_zombie_impexp_notify();
1777 cfs_wait_for_completion(&obd_zombie_stop);
1779 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1780 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1784 /***** Kernel-userspace comm helpers *******/
1786 /* Get length of entire message, including header */
1787 int kuc_len(int payload_len)
1789 return sizeof(struct kuc_hdr) + payload_len;
1791 EXPORT_SYMBOL(kuc_len);
1793 /* Get a pointer to kuc header, given a ptr to the payload
1794 * @param p Pointer to payload area
1795 * @returns Pointer to kuc header
1797 struct kuc_hdr * kuc_ptr(void *p)
1799 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1800 LASSERT(lh->kuc_magic == KUC_MAGIC);
1803 EXPORT_SYMBOL(kuc_ptr);
1805 /* Test if payload is part of kuc message
1806 * @param p Pointer to payload area
1809 int kuc_ispayload(void *p)
1811 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1813 if (kh->kuc_magic == KUC_MAGIC)
1818 EXPORT_SYMBOL(kuc_ispayload);
1820 /* Alloc space for a message, and fill in header
1821 * @return Pointer to payload area
1823 void *kuc_alloc(int payload_len, int transport, int type)
1826 int len = kuc_len(payload_len);
1830 return ERR_PTR(-ENOMEM);
1832 lh->kuc_magic = KUC_MAGIC;
1833 lh->kuc_transport = transport;
1834 lh->kuc_msgtype = type;
1835 lh->kuc_msglen = len;
1837 return (void *)(lh + 1);
1839 EXPORT_SYMBOL(kuc_alloc);
1841 /* Takes pointer to payload area */
1842 inline void kuc_free(void *p, int payload_len)
1844 struct kuc_hdr *lh = kuc_ptr(p);
1845 OBD_FREE(lh, kuc_len(payload_len));
1847 EXPORT_SYMBOL(kuc_free);