4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 cfs_spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 cfs_spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 cfs_spin_unlock(&obd_types_lock);
112 cfs_spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (!cfs_request_module("%s", modname)) {
129 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
130 type = class_search_type(name);
132 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
138 cfs_spin_lock(&type->obd_type_lock);
140 cfs_try_module_get(type->typ_dt_ops->o_owner);
141 cfs_spin_unlock(&type->obd_type_lock);
145 EXPORT_SYMBOL(class_get_type);
147 void class_put_type(struct obd_type *type)
150 cfs_spin_lock(&type->obd_type_lock);
152 cfs_module_put(type->typ_dt_ops->o_owner);
153 cfs_spin_unlock(&type->obd_type_lock);
155 EXPORT_SYMBOL(class_put_type);
157 #define CLASS_MAX_NAME 1024
159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160 struct lprocfs_vars *vars, const char *name,
161 struct lu_device_type *ldt)
163 struct obd_type *type;
168 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
170 if (class_search_type(name)) {
171 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
176 OBD_ALLOC(type, sizeof(*type));
180 OBD_ALLOC_PTR(type->typ_dt_ops);
181 OBD_ALLOC_PTR(type->typ_md_ops);
182 OBD_ALLOC(type->typ_name, strlen(name) + 1);
184 if (type->typ_dt_ops == NULL ||
185 type->typ_md_ops == NULL ||
186 type->typ_name == NULL)
189 *(type->typ_dt_ops) = *dt_ops;
190 /* md_ops is optional */
192 *(type->typ_md_ops) = *md_ops;
193 strcpy(type->typ_name, name);
194 cfs_spin_lock_init(&type->obd_type_lock);
197 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
199 if (IS_ERR(type->typ_procroot)) {
200 rc = PTR_ERR(type->typ_procroot);
201 type->typ_procroot = NULL;
207 rc = lu_device_type_init(ldt);
212 cfs_spin_lock(&obd_types_lock);
213 cfs_list_add(&type->typ_chain, &obd_types);
214 cfs_spin_unlock(&obd_types_lock);
219 if (type->typ_name != NULL)
220 OBD_FREE(type->typ_name, strlen(name) + 1);
221 if (type->typ_md_ops != NULL)
222 OBD_FREE_PTR(type->typ_md_ops);
223 if (type->typ_dt_ops != NULL)
224 OBD_FREE_PTR(type->typ_dt_ops);
225 OBD_FREE(type, sizeof(*type));
228 EXPORT_SYMBOL(class_register_type);
230 int class_unregister_type(const char *name)
232 struct obd_type *type = class_search_type(name);
236 CERROR("unknown obd type\n");
240 if (type->typ_refcnt) {
241 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
242 /* This is a bad situation, let's make the best of it */
243 /* Remove ops, but leave the name for debugging */
244 OBD_FREE_PTR(type->typ_dt_ops);
245 OBD_FREE_PTR(type->typ_md_ops);
249 /* we do not use type->typ_procroot as for compatibility purposes
250 * other modules can share names (i.e. lod can use lov entry). so
251 * we can't reference pointer as it can get invalided when another
252 * module removes the entry */
253 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
256 lu_device_type_fini(type->typ_lu);
258 cfs_spin_lock(&obd_types_lock);
259 cfs_list_del(&type->typ_chain);
260 cfs_spin_unlock(&obd_types_lock);
261 OBD_FREE(type->typ_name, strlen(name) + 1);
262 if (type->typ_dt_ops != NULL)
263 OBD_FREE_PTR(type->typ_dt_ops);
264 if (type->typ_md_ops != NULL)
265 OBD_FREE_PTR(type->typ_md_ops);
266 OBD_FREE(type, sizeof(*type));
268 } /* class_unregister_type */
269 EXPORT_SYMBOL(class_unregister_type);
272 * Create a new obd device.
274 * Find an empty slot in ::obd_devs[], create a new obd device in it.
276 * \param[in] type_name obd device type string.
277 * \param[in] name obd device name.
279 * \retval NULL if create fails, otherwise return the obd device
282 struct obd_device *class_newdev(const char *type_name, const char *name)
284 struct obd_device *result = NULL;
285 struct obd_device *newdev;
286 struct obd_type *type = NULL;
288 int new_obd_minor = 0;
291 if (strlen(name) >= MAX_OBD_NAME) {
292 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
293 RETURN(ERR_PTR(-EINVAL));
296 type = class_get_type(type_name);
298 CERROR("OBD: unknown type: %s\n", type_name);
299 RETURN(ERR_PTR(-ENODEV));
302 newdev = obd_device_alloc();
303 if (newdev == NULL) {
304 class_put_type(type);
305 RETURN(ERR_PTR(-ENOMEM));
307 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
309 cfs_write_lock(&obd_dev_lock);
310 for (i = 0; i < class_devno_max(); i++) {
311 struct obd_device *obd = class_num2obd(i);
313 if (obd && obd->obd_name &&
314 (strcmp(name, obd->obd_name) == 0)) {
315 CERROR("Device %s already exists at %d, won't add\n",
318 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
319 "%p obd_magic %08x != %08x\n", result,
320 result->obd_magic, OBD_DEVICE_MAGIC);
321 LASSERTF(result->obd_minor == new_obd_minor,
322 "%p obd_minor %d != %d\n", result,
323 result->obd_minor, new_obd_minor);
325 obd_devs[result->obd_minor] = NULL;
326 result->obd_name[0]='\0';
328 result = ERR_PTR(-EEXIST);
331 if (!result && !obd) {
333 result->obd_minor = i;
335 result->obd_type = type;
336 strncpy(result->obd_name, name,
337 sizeof(result->obd_name) - 1);
338 obd_devs[i] = result;
341 cfs_write_unlock(&obd_dev_lock);
343 if (result == NULL && i >= class_devno_max()) {
344 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
346 RETURN(ERR_PTR(-EOVERFLOW));
349 if (IS_ERR(result)) {
350 obd_device_free(newdev);
351 class_put_type(type);
353 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
354 result->obd_name, result);
359 void class_release_dev(struct obd_device *obd)
361 struct obd_type *obd_type = obd->obd_type;
363 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
364 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
365 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
366 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
367 LASSERT(obd_type != NULL);
369 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
370 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
372 cfs_write_lock(&obd_dev_lock);
373 obd_devs[obd->obd_minor] = NULL;
374 cfs_write_unlock(&obd_dev_lock);
375 obd_device_free(obd);
377 class_put_type(obd_type);
380 int class_name2dev(const char *name)
387 cfs_read_lock(&obd_dev_lock);
388 for (i = 0; i < class_devno_max(); i++) {
389 struct obd_device *obd = class_num2obd(i);
391 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
392 /* Make sure we finished attaching before we give
393 out any references */
394 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
395 if (obd->obd_attached) {
396 cfs_read_unlock(&obd_dev_lock);
402 cfs_read_unlock(&obd_dev_lock);
406 EXPORT_SYMBOL(class_name2dev);
408 struct obd_device *class_name2obd(const char *name)
410 int dev = class_name2dev(name);
412 if (dev < 0 || dev > class_devno_max())
414 return class_num2obd(dev);
416 EXPORT_SYMBOL(class_name2obd);
418 int class_uuid2dev(struct obd_uuid *uuid)
422 cfs_read_lock(&obd_dev_lock);
423 for (i = 0; i < class_devno_max(); i++) {
424 struct obd_device *obd = class_num2obd(i);
426 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
427 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
428 cfs_read_unlock(&obd_dev_lock);
432 cfs_read_unlock(&obd_dev_lock);
436 EXPORT_SYMBOL(class_uuid2dev);
438 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
440 int dev = class_uuid2dev(uuid);
443 return class_num2obd(dev);
445 EXPORT_SYMBOL(class_uuid2obd);
448 * Get obd device from ::obd_devs[]
450 * \param num [in] array index
452 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
453 * otherwise return the obd device there.
455 struct obd_device *class_num2obd(int num)
457 struct obd_device *obd = NULL;
459 if (num < class_devno_max()) {
464 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
465 "%p obd_magic %08x != %08x\n",
466 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
467 LASSERTF(obd->obd_minor == num,
468 "%p obd_minor %0d != %0d\n",
469 obd, obd->obd_minor, num);
474 EXPORT_SYMBOL(class_num2obd);
476 void class_obd_list(void)
481 cfs_read_lock(&obd_dev_lock);
482 for (i = 0; i < class_devno_max(); i++) {
483 struct obd_device *obd = class_num2obd(i);
487 if (obd->obd_stopping)
489 else if (obd->obd_set_up)
491 else if (obd->obd_attached)
495 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
496 i, status, obd->obd_type->typ_name,
497 obd->obd_name, obd->obd_uuid.uuid,
498 cfs_atomic_read(&obd->obd_refcount));
500 cfs_read_unlock(&obd_dev_lock);
504 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
505 specified, then only the client with that uuid is returned,
506 otherwise any client connected to the tgt is returned. */
507 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
508 const char * typ_name,
509 struct obd_uuid *grp_uuid)
513 cfs_read_lock(&obd_dev_lock);
514 for (i = 0; i < class_devno_max(); i++) {
515 struct obd_device *obd = class_num2obd(i);
519 if ((strncmp(obd->obd_type->typ_name, typ_name,
520 strlen(typ_name)) == 0)) {
521 if (obd_uuid_equals(tgt_uuid,
522 &obd->u.cli.cl_target_uuid) &&
523 ((grp_uuid)? obd_uuid_equals(grp_uuid,
524 &obd->obd_uuid) : 1)) {
525 cfs_read_unlock(&obd_dev_lock);
530 cfs_read_unlock(&obd_dev_lock);
534 EXPORT_SYMBOL(class_find_client_obd);
536 /* Iterate the obd_device list looking devices have grp_uuid. Start
537 searching at *next, and if a device is found, the next index to look
538 at is saved in *next. If next is NULL, then the first matching device
539 will always be returned. */
540 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
546 else if (*next >= 0 && *next < class_devno_max())
551 cfs_read_lock(&obd_dev_lock);
552 for (; i < class_devno_max(); i++) {
553 struct obd_device *obd = class_num2obd(i);
557 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
560 cfs_read_unlock(&obd_dev_lock);
564 cfs_read_unlock(&obd_dev_lock);
568 EXPORT_SYMBOL(class_devices_in_group);
571 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
572 * adjust sptlrpc settings accordingly.
574 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
576 struct obd_device *obd;
580 LASSERT(namelen > 0);
582 cfs_read_lock(&obd_dev_lock);
583 for (i = 0; i < class_devno_max(); i++) {
584 obd = class_num2obd(i);
586 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
589 /* only notify mdc, osc, mdt, ost */
590 type = obd->obd_type->typ_name;
591 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
592 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
593 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
594 strcmp(type, LUSTRE_OST_NAME) != 0)
597 if (strncmp(obd->obd_name, fsname, namelen))
600 class_incref(obd, __FUNCTION__, obd);
601 cfs_read_unlock(&obd_dev_lock);
602 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
603 sizeof(KEY_SPTLRPC_CONF),
604 KEY_SPTLRPC_CONF, 0, NULL, NULL);
606 class_decref(obd, __FUNCTION__, obd);
607 cfs_read_lock(&obd_dev_lock);
609 cfs_read_unlock(&obd_dev_lock);
612 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
614 void obd_cleanup_caches(void)
619 if (obd_device_cachep) {
620 rc = cfs_mem_cache_destroy(obd_device_cachep);
621 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
622 obd_device_cachep = NULL;
625 rc = cfs_mem_cache_destroy(obdo_cachep);
626 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
630 rc = cfs_mem_cache_destroy(import_cachep);
631 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
632 import_cachep = NULL;
635 rc = cfs_mem_cache_destroy(capa_cachep);
636 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
642 int obd_init_caches(void)
646 LASSERT(obd_device_cachep == NULL);
647 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
648 sizeof(struct obd_device),
650 if (!obd_device_cachep)
653 LASSERT(obdo_cachep == NULL);
654 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
659 LASSERT(import_cachep == NULL);
660 import_cachep = cfs_mem_cache_create("ll_import_cache",
661 sizeof(struct obd_import),
666 LASSERT(capa_cachep == NULL);
667 capa_cachep = cfs_mem_cache_create("capa_cache",
668 sizeof(struct obd_capa), 0, 0);
674 obd_cleanup_caches();
679 /* map connection to client */
680 struct obd_export *class_conn2export(struct lustre_handle *conn)
682 struct obd_export *export;
686 CDEBUG(D_CACHE, "looking for null handle\n");
690 if (conn->cookie == -1) { /* this means assign a new connection */
691 CDEBUG(D_CACHE, "want a new connection\n");
695 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
696 export = class_handle2object(conn->cookie);
699 EXPORT_SYMBOL(class_conn2export);
701 struct obd_device *class_exp2obd(struct obd_export *exp)
707 EXPORT_SYMBOL(class_exp2obd);
709 struct obd_device *class_conn2obd(struct lustre_handle *conn)
711 struct obd_export *export;
712 export = class_conn2export(conn);
714 struct obd_device *obd = export->exp_obd;
715 class_export_put(export);
720 EXPORT_SYMBOL(class_conn2obd);
722 struct obd_import *class_exp2cliimp(struct obd_export *exp)
724 struct obd_device *obd = exp->exp_obd;
727 return obd->u.cli.cl_import;
729 EXPORT_SYMBOL(class_exp2cliimp);
731 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
733 struct obd_device *obd = class_conn2obd(conn);
736 return obd->u.cli.cl_import;
738 EXPORT_SYMBOL(class_conn2cliimp);
740 /* Export management functions */
741 static void class_export_destroy(struct obd_export *exp)
743 struct obd_device *obd = exp->exp_obd;
746 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
747 LASSERT(obd != NULL);
749 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
750 exp->exp_client_uuid.uuid, obd->obd_name);
752 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
753 if (exp->exp_connection)
754 ptlrpc_put_connection_superhack(exp->exp_connection);
756 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
757 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
758 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
759 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
760 obd_destroy_export(exp);
761 class_decref(obd, "export", exp);
763 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
767 static void export_handle_addref(void *export)
769 class_export_get(export);
772 static struct portals_handle_ops export_handle_ops = {
773 .hop_addref = export_handle_addref,
777 struct obd_export *class_export_get(struct obd_export *exp)
779 cfs_atomic_inc(&exp->exp_refcount);
780 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
781 cfs_atomic_read(&exp->exp_refcount));
784 EXPORT_SYMBOL(class_export_get);
786 void class_export_put(struct obd_export *exp)
788 LASSERT(exp != NULL);
789 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
790 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
791 cfs_atomic_read(&exp->exp_refcount) - 1);
793 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
794 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
795 CDEBUG(D_IOCTL, "final put %p/%s\n",
796 exp, exp->exp_client_uuid.uuid);
798 /* release nid stat refererence */
799 lprocfs_exp_cleanup(exp);
801 obd_zombie_export_add(exp);
804 EXPORT_SYMBOL(class_export_put);
806 /* Creates a new export, adds it to the hash table, and returns a
807 * pointer to it. The refcount is 2: one for the hash reference, and
808 * one for the pointer returned by this function. */
809 struct obd_export *class_new_export(struct obd_device *obd,
810 struct obd_uuid *cluuid)
812 struct obd_export *export;
813 cfs_hash_t *hash = NULL;
817 OBD_ALLOC_PTR(export);
819 return ERR_PTR(-ENOMEM);
821 export->exp_conn_cnt = 0;
822 export->exp_lock_hash = NULL;
823 export->exp_flock_hash = NULL;
824 cfs_atomic_set(&export->exp_refcount, 2);
825 cfs_atomic_set(&export->exp_rpc_count, 0);
826 cfs_atomic_set(&export->exp_cb_count, 0);
827 cfs_atomic_set(&export->exp_locks_count, 0);
828 #if LUSTRE_TRACKS_LOCK_EXP_REFS
829 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
830 cfs_spin_lock_init(&export->exp_locks_list_guard);
832 cfs_atomic_set(&export->exp_replay_count, 0);
833 export->exp_obd = obd;
834 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
835 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
836 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
837 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
838 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
839 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
840 class_handle_hash(&export->exp_handle, &export_handle_ops);
841 export->exp_last_request_time = cfs_time_current_sec();
842 cfs_spin_lock_init(&export->exp_lock);
843 cfs_spin_lock_init(&export->exp_rpc_lock);
844 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
845 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
846 cfs_spin_lock_init(&export->exp_bl_list_lock);
847 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
849 export->exp_sp_peer = LUSTRE_SP_ANY;
850 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
851 export->exp_client_uuid = *cluuid;
852 obd_init_export(export);
854 cfs_spin_lock(&obd->obd_dev_lock);
855 /* shouldn't happen, but might race */
856 if (obd->obd_stopping)
857 GOTO(exit_unlock, rc = -ENODEV);
859 hash = cfs_hash_getref(obd->obd_uuid_hash);
861 GOTO(exit_unlock, rc = -ENODEV);
862 cfs_spin_unlock(&obd->obd_dev_lock);
864 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
865 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
867 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
868 obd->obd_name, cluuid->uuid, rc);
869 GOTO(exit_err, rc = -EALREADY);
873 cfs_spin_lock(&obd->obd_dev_lock);
874 if (obd->obd_stopping) {
875 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
876 GOTO(exit_unlock, rc = -ENODEV);
879 class_incref(obd, "export", export);
880 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
881 cfs_list_add_tail(&export->exp_obd_chain_timed,
882 &export->exp_obd->obd_exports_timed);
883 export->exp_obd->obd_num_exports++;
884 cfs_spin_unlock(&obd->obd_dev_lock);
885 cfs_hash_putref(hash);
889 cfs_spin_unlock(&obd->obd_dev_lock);
892 cfs_hash_putref(hash);
893 class_handle_unhash(&export->exp_handle);
894 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
895 obd_destroy_export(export);
896 OBD_FREE_PTR(export);
899 EXPORT_SYMBOL(class_new_export);
901 void class_unlink_export(struct obd_export *exp)
903 class_handle_unhash(&exp->exp_handle);
905 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
906 /* delete an uuid-export hashitem from hashtables */
907 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
908 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
909 &exp->exp_client_uuid,
910 &exp->exp_uuid_hash);
912 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
913 cfs_list_del_init(&exp->exp_obd_chain_timed);
914 exp->exp_obd->obd_num_exports--;
915 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
916 class_export_put(exp);
918 EXPORT_SYMBOL(class_unlink_export);
920 /* Import management functions */
921 void class_import_destroy(struct obd_import *imp)
925 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
926 imp->imp_obd->obd_name);
928 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
930 ptlrpc_put_connection_superhack(imp->imp_connection);
932 while (!cfs_list_empty(&imp->imp_conn_list)) {
933 struct obd_import_conn *imp_conn;
935 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
936 struct obd_import_conn, oic_item);
937 cfs_list_del_init(&imp_conn->oic_item);
938 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
939 OBD_FREE(imp_conn, sizeof(*imp_conn));
942 LASSERT(imp->imp_sec == NULL);
943 class_decref(imp->imp_obd, "import", imp);
944 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
948 static void import_handle_addref(void *import)
950 class_import_get(import);
953 static struct portals_handle_ops import_handle_ops = {
954 .hop_addref = import_handle_addref,
958 struct obd_import *class_import_get(struct obd_import *import)
960 cfs_atomic_inc(&import->imp_refcount);
961 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
962 cfs_atomic_read(&import->imp_refcount),
963 import->imp_obd->obd_name);
966 EXPORT_SYMBOL(class_import_get);
968 void class_import_put(struct obd_import *imp)
972 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
973 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
975 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
976 cfs_atomic_read(&imp->imp_refcount) - 1,
977 imp->imp_obd->obd_name);
979 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
980 CDEBUG(D_INFO, "final put import %p\n", imp);
981 obd_zombie_import_add(imp);
984 /* catch possible import put race */
985 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
988 EXPORT_SYMBOL(class_import_put);
990 static void init_imp_at(struct imp_at *at) {
992 at_init(&at->iat_net_latency, 0, 0);
993 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
994 /* max service estimates are tracked on the server side, so
995 don't use the AT history here, just use the last reported
996 val. (But keep hist for proc histogram, worst_ever) */
997 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1002 struct obd_import *class_new_import(struct obd_device *obd)
1004 struct obd_import *imp;
1006 OBD_ALLOC(imp, sizeof(*imp));
1010 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1011 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1012 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1013 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1014 cfs_spin_lock_init(&imp->imp_lock);
1015 imp->imp_last_success_conn = 0;
1016 imp->imp_state = LUSTRE_IMP_NEW;
1017 imp->imp_obd = class_incref(obd, "import", imp);
1018 cfs_mutex_init(&imp->imp_sec_mutex);
1019 cfs_waitq_init(&imp->imp_recovery_waitq);
1021 cfs_atomic_set(&imp->imp_refcount, 2);
1022 cfs_atomic_set(&imp->imp_unregistering, 0);
1023 cfs_atomic_set(&imp->imp_inflight, 0);
1024 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1025 cfs_atomic_set(&imp->imp_inval_count, 0);
1026 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1027 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1028 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1029 init_imp_at(&imp->imp_at);
1031 /* the default magic is V2, will be used in connect RPC, and
1032 * then adjusted according to the flags in request/reply. */
1033 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1037 EXPORT_SYMBOL(class_new_import);
1039 void class_destroy_import(struct obd_import *import)
1041 LASSERT(import != NULL);
1042 LASSERT(import != LP_POISON);
1044 class_handle_unhash(&import->imp_handle);
1046 cfs_spin_lock(&import->imp_lock);
1047 import->imp_generation++;
1048 cfs_spin_unlock(&import->imp_lock);
1049 class_import_put(import);
1051 EXPORT_SYMBOL(class_destroy_import);
1053 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1055 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1057 cfs_spin_lock(&exp->exp_locks_list_guard);
1059 LASSERT(lock->l_exp_refs_nr >= 0);
1061 if (lock->l_exp_refs_target != NULL &&
1062 lock->l_exp_refs_target != exp) {
1063 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1064 exp, lock, lock->l_exp_refs_target);
1066 if ((lock->l_exp_refs_nr ++) == 0) {
1067 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1068 lock->l_exp_refs_target = exp;
1070 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1071 lock, exp, lock->l_exp_refs_nr);
1072 cfs_spin_unlock(&exp->exp_locks_list_guard);
1074 EXPORT_SYMBOL(__class_export_add_lock_ref);
1076 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1078 cfs_spin_lock(&exp->exp_locks_list_guard);
1079 LASSERT(lock->l_exp_refs_nr > 0);
1080 if (lock->l_exp_refs_target != exp) {
1081 LCONSOLE_WARN("lock %p, "
1082 "mismatching export pointers: %p, %p\n",
1083 lock, lock->l_exp_refs_target, exp);
1085 if (-- lock->l_exp_refs_nr == 0) {
1086 cfs_list_del_init(&lock->l_exp_refs_link);
1087 lock->l_exp_refs_target = NULL;
1089 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1090 lock, exp, lock->l_exp_refs_nr);
1091 cfs_spin_unlock(&exp->exp_locks_list_guard);
1093 EXPORT_SYMBOL(__class_export_del_lock_ref);
1096 /* A connection defines an export context in which preallocation can
1097 be managed. This releases the export pointer reference, and returns
1098 the export handle, so the export refcount is 1 when this function
1100 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1101 struct obd_uuid *cluuid)
1103 struct obd_export *export;
1104 LASSERT(conn != NULL);
1105 LASSERT(obd != NULL);
1106 LASSERT(cluuid != NULL);
1109 export = class_new_export(obd, cluuid);
1111 RETURN(PTR_ERR(export));
1113 conn->cookie = export->exp_handle.h_cookie;
1114 class_export_put(export);
1116 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1117 cluuid->uuid, conn->cookie);
1120 EXPORT_SYMBOL(class_connect);
1122 /* if export is involved in recovery then clean up related things */
1123 void class_export_recovery_cleanup(struct obd_export *exp)
1125 struct obd_device *obd = exp->exp_obd;
1127 cfs_spin_lock(&obd->obd_recovery_task_lock);
1128 if (exp->exp_delayed)
1129 obd->obd_delayed_clients--;
1130 if (obd->obd_recovering && exp->exp_in_recovery) {
1131 cfs_spin_lock(&exp->exp_lock);
1132 exp->exp_in_recovery = 0;
1133 cfs_spin_unlock(&exp->exp_lock);
1134 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1135 cfs_atomic_dec(&obd->obd_connected_clients);
1137 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1138 /** Cleanup req replay fields */
1139 if (exp->exp_req_replay_needed) {
1140 cfs_spin_lock(&exp->exp_lock);
1141 exp->exp_req_replay_needed = 0;
1142 cfs_spin_unlock(&exp->exp_lock);
1143 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1144 cfs_atomic_dec(&obd->obd_req_replay_clients);
1146 /** Cleanup lock replay data */
1147 if (exp->exp_lock_replay_needed) {
1148 cfs_spin_lock(&exp->exp_lock);
1149 exp->exp_lock_replay_needed = 0;
1150 cfs_spin_unlock(&exp->exp_lock);
1151 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1152 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1156 /* This function removes 1-3 references from the export:
1157 * 1 - for export pointer passed
1158 * and if disconnect really need
1159 * 2 - removing from hash
1160 * 3 - in client_unlink_export
1161 * The export pointer passed to this function can destroyed */
1162 int class_disconnect(struct obd_export *export)
1164 int already_disconnected;
1167 if (export == NULL) {
1168 CWARN("attempting to free NULL export %p\n", export);
1172 cfs_spin_lock(&export->exp_lock);
1173 already_disconnected = export->exp_disconnected;
1174 export->exp_disconnected = 1;
1175 cfs_spin_unlock(&export->exp_lock);
1177 /* class_cleanup(), abort_recovery(), and class_fail_export()
1178 * all end up in here, and if any of them race we shouldn't
1179 * call extra class_export_puts(). */
1180 if (already_disconnected) {
1181 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1182 GOTO(no_disconn, already_disconnected);
1185 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1186 export->exp_handle.h_cookie);
1188 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1189 cfs_hash_del(export->exp_obd->obd_nid_hash,
1190 &export->exp_connection->c_peer.nid,
1191 &export->exp_nid_hash);
1193 class_export_recovery_cleanup(export);
1194 class_unlink_export(export);
1196 class_export_put(export);
1199 EXPORT_SYMBOL(class_disconnect);
1201 /* Return non-zero for a fully connected export */
1202 int class_connected_export(struct obd_export *exp)
1206 cfs_spin_lock(&exp->exp_lock);
1207 connected = (exp->exp_conn_cnt > 0);
1208 cfs_spin_unlock(&exp->exp_lock);
1213 EXPORT_SYMBOL(class_connected_export);
1215 static void class_disconnect_export_list(cfs_list_t *list,
1216 enum obd_option flags)
1219 struct obd_export *exp;
1222 /* It's possible that an export may disconnect itself, but
1223 * nothing else will be added to this list. */
1224 while (!cfs_list_empty(list)) {
1225 exp = cfs_list_entry(list->next, struct obd_export,
1227 /* need for safe call CDEBUG after obd_disconnect */
1228 class_export_get(exp);
1230 cfs_spin_lock(&exp->exp_lock);
1231 exp->exp_flags = flags;
1232 cfs_spin_unlock(&exp->exp_lock);
1234 if (obd_uuid_equals(&exp->exp_client_uuid,
1235 &exp->exp_obd->obd_uuid)) {
1237 "exp %p export uuid == obd uuid, don't discon\n",
1239 /* Need to delete this now so we don't end up pointing
1240 * to work_list later when this export is cleaned up. */
1241 cfs_list_del_init(&exp->exp_obd_chain);
1242 class_export_put(exp);
1246 class_export_get(exp);
1247 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1248 "last request at "CFS_TIME_T"\n",
1249 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1250 exp, exp->exp_last_request_time);
1251 /* release one export reference anyway */
1252 rc = obd_disconnect(exp);
1254 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1255 obd_export_nid2str(exp), exp, rc);
1256 class_export_put(exp);
1261 void class_disconnect_exports(struct obd_device *obd)
1263 cfs_list_t work_list;
1266 /* Move all of the exports from obd_exports to a work list, en masse. */
1267 CFS_INIT_LIST_HEAD(&work_list);
1268 cfs_spin_lock(&obd->obd_dev_lock);
1269 cfs_list_splice_init(&obd->obd_exports, &work_list);
1270 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1271 cfs_spin_unlock(&obd->obd_dev_lock);
1273 if (!cfs_list_empty(&work_list)) {
1274 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1275 "disconnecting them\n", obd->obd_minor, obd);
1276 class_disconnect_export_list(&work_list,
1277 exp_flags_from_obd(obd));
1279 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1280 obd->obd_minor, obd);
1283 EXPORT_SYMBOL(class_disconnect_exports);
1285 /* Remove exports that have not completed recovery.
1287 void class_disconnect_stale_exports(struct obd_device *obd,
1288 int (*test_export)(struct obd_export *))
1290 cfs_list_t work_list;
1291 struct obd_export *exp, *n;
1295 CFS_INIT_LIST_HEAD(&work_list);
1296 cfs_spin_lock(&obd->obd_dev_lock);
1297 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1299 /* don't count self-export as client */
1300 if (obd_uuid_equals(&exp->exp_client_uuid,
1301 &exp->exp_obd->obd_uuid))
1304 /* don't evict clients which have no slot in last_rcvd
1305 * (e.g. lightweight connection) */
1306 if (exp->exp_target_data.ted_lr_idx == -1)
1309 cfs_spin_lock(&exp->exp_lock);
1310 if (test_export(exp)) {
1311 cfs_spin_unlock(&exp->exp_lock);
1314 exp->exp_failed = 1;
1315 cfs_spin_unlock(&exp->exp_lock);
1317 cfs_list_move(&exp->exp_obd_chain, &work_list);
1319 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1320 obd->obd_name, exp->exp_client_uuid.uuid,
1321 exp->exp_connection == NULL ? "<unknown>" :
1322 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1323 print_export_data(exp, "EVICTING", 0);
1325 cfs_spin_unlock(&obd->obd_dev_lock);
1328 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1329 obd->obd_name, evicted);
1330 obd->obd_stale_clients += evicted;
1332 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1333 OBD_OPT_ABORT_RECOV);
1336 EXPORT_SYMBOL(class_disconnect_stale_exports);
1338 void class_fail_export(struct obd_export *exp)
1340 int rc, already_failed;
1342 cfs_spin_lock(&exp->exp_lock);
1343 already_failed = exp->exp_failed;
1344 exp->exp_failed = 1;
1345 cfs_spin_unlock(&exp->exp_lock);
1347 if (already_failed) {
1348 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1349 exp, exp->exp_client_uuid.uuid);
1353 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1354 exp, exp->exp_client_uuid.uuid);
1356 if (obd_dump_on_timeout)
1357 libcfs_debug_dumplog();
1359 /* need for safe call CDEBUG after obd_disconnect */
1360 class_export_get(exp);
1362 /* Most callers into obd_disconnect are removing their own reference
1363 * (request, for example) in addition to the one from the hash table.
1364 * We don't have such a reference here, so make one. */
1365 class_export_get(exp);
1366 rc = obd_disconnect(exp);
1368 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1370 CDEBUG(D_HA, "disconnected export %p/%s\n",
1371 exp, exp->exp_client_uuid.uuid);
1372 class_export_put(exp);
1374 EXPORT_SYMBOL(class_fail_export);
1376 char *obd_export_nid2str(struct obd_export *exp)
1378 if (exp->exp_connection != NULL)
1379 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1383 EXPORT_SYMBOL(obd_export_nid2str);
1385 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1387 struct obd_export *doomed_exp = NULL;
1388 int exports_evicted = 0;
1390 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1393 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1394 if (doomed_exp == NULL)
1397 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1398 "nid %s found, wanted nid %s, requested nid %s\n",
1399 obd_export_nid2str(doomed_exp),
1400 libcfs_nid2str(nid_key), nid);
1401 LASSERTF(doomed_exp != obd->obd_self_export,
1402 "self-export is hashed by NID?\n");
1404 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1405 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1407 class_fail_export(doomed_exp);
1408 class_export_put(doomed_exp);
1411 if (!exports_evicted)
1412 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1413 obd->obd_name, nid);
1414 return exports_evicted;
1416 EXPORT_SYMBOL(obd_export_evict_by_nid);
1418 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1420 struct obd_export *doomed_exp = NULL;
1421 struct obd_uuid doomed_uuid;
1422 int exports_evicted = 0;
1424 obd_str2uuid(&doomed_uuid, uuid);
1425 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1426 CERROR("%s: can't evict myself\n", obd->obd_name);
1427 return exports_evicted;
1430 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1432 if (doomed_exp == NULL) {
1433 CERROR("%s: can't disconnect %s: no exports found\n",
1434 obd->obd_name, uuid);
1436 CWARN("%s: evicting %s at adminstrative request\n",
1437 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1438 class_fail_export(doomed_exp);
1439 class_export_put(doomed_exp);
1443 return exports_evicted;
1445 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1447 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1448 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1449 EXPORT_SYMBOL(class_export_dump_hook);
1452 static void print_export_data(struct obd_export *exp, const char *status,
1455 struct ptlrpc_reply_state *rs;
1456 struct ptlrpc_reply_state *first_reply = NULL;
1459 cfs_spin_lock(&exp->exp_lock);
1460 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1466 cfs_spin_unlock(&exp->exp_lock);
1468 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1469 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1470 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1471 cfs_atomic_read(&exp->exp_rpc_count),
1472 cfs_atomic_read(&exp->exp_cb_count),
1473 cfs_atomic_read(&exp->exp_locks_count),
1474 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1475 nreplies, first_reply, nreplies > 3 ? "..." : "",
1476 exp->exp_last_committed);
1477 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1478 if (locks && class_export_dump_hook != NULL)
1479 class_export_dump_hook(exp);
1483 void dump_exports(struct obd_device *obd, int locks)
1485 struct obd_export *exp;
1487 cfs_spin_lock(&obd->obd_dev_lock);
1488 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1489 print_export_data(exp, "ACTIVE", locks);
1490 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1491 print_export_data(exp, "UNLINKED", locks);
1492 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1493 print_export_data(exp, "DELAYED", locks);
1494 cfs_spin_unlock(&obd->obd_dev_lock);
1495 cfs_spin_lock(&obd_zombie_impexp_lock);
1496 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1497 print_export_data(exp, "ZOMBIE", locks);
1498 cfs_spin_unlock(&obd_zombie_impexp_lock);
1500 EXPORT_SYMBOL(dump_exports);
1502 void obd_exports_barrier(struct obd_device *obd)
1505 LASSERT(cfs_list_empty(&obd->obd_exports));
1506 cfs_spin_lock(&obd->obd_dev_lock);
1507 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1508 cfs_spin_unlock(&obd->obd_dev_lock);
1509 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1510 cfs_time_seconds(waited));
1511 if (waited > 5 && IS_PO2(waited)) {
1512 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1513 "more than %d seconds. "
1514 "The obd refcount = %d. Is it stuck?\n",
1515 obd->obd_name, waited,
1516 cfs_atomic_read(&obd->obd_refcount));
1517 dump_exports(obd, 1);
1520 cfs_spin_lock(&obd->obd_dev_lock);
1522 cfs_spin_unlock(&obd->obd_dev_lock);
1524 EXPORT_SYMBOL(obd_exports_barrier);
1526 /* Total amount of zombies to be destroyed */
1527 static int zombies_count = 0;
1530 * kill zombie imports and exports
1532 void obd_zombie_impexp_cull(void)
1534 struct obd_import *import;
1535 struct obd_export *export;
1539 cfs_spin_lock(&obd_zombie_impexp_lock);
1542 if (!cfs_list_empty(&obd_zombie_imports)) {
1543 import = cfs_list_entry(obd_zombie_imports.next,
1546 cfs_list_del_init(&import->imp_zombie_chain);
1550 if (!cfs_list_empty(&obd_zombie_exports)) {
1551 export = cfs_list_entry(obd_zombie_exports.next,
1554 cfs_list_del_init(&export->exp_obd_chain);
1557 cfs_spin_unlock(&obd_zombie_impexp_lock);
1559 if (import != NULL) {
1560 class_import_destroy(import);
1561 cfs_spin_lock(&obd_zombie_impexp_lock);
1563 cfs_spin_unlock(&obd_zombie_impexp_lock);
1566 if (export != NULL) {
1567 class_export_destroy(export);
1568 cfs_spin_lock(&obd_zombie_impexp_lock);
1570 cfs_spin_unlock(&obd_zombie_impexp_lock);
1574 } while (import != NULL || export != NULL);
1578 static cfs_completion_t obd_zombie_start;
1579 static cfs_completion_t obd_zombie_stop;
1580 static unsigned long obd_zombie_flags;
1581 static cfs_waitq_t obd_zombie_waitq;
1582 static pid_t obd_zombie_pid;
1585 OBD_ZOMBIE_STOP = 1 << 1
1589 * check for work for kill zombie import/export thread.
1591 static int obd_zombie_impexp_check(void *arg)
1595 cfs_spin_lock(&obd_zombie_impexp_lock);
1596 rc = (zombies_count == 0) &&
1597 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1598 cfs_spin_unlock(&obd_zombie_impexp_lock);
1604 * Add export to the obd_zombe thread and notify it.
1606 static void obd_zombie_export_add(struct obd_export *exp) {
1607 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1608 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1609 cfs_list_del_init(&exp->exp_obd_chain);
1610 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1611 cfs_spin_lock(&obd_zombie_impexp_lock);
1613 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1614 cfs_spin_unlock(&obd_zombie_impexp_lock);
1616 obd_zombie_impexp_notify();
1620 * Add import to the obd_zombe thread and notify it.
1622 static void obd_zombie_import_add(struct obd_import *imp) {
1623 LASSERT(imp->imp_sec == NULL);
1624 LASSERT(imp->imp_rq_pool == NULL);
1625 cfs_spin_lock(&obd_zombie_impexp_lock);
1626 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1628 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1629 cfs_spin_unlock(&obd_zombie_impexp_lock);
1631 obd_zombie_impexp_notify();
1635 * notify import/export destroy thread about new zombie.
1637 static void obd_zombie_impexp_notify(void)
1640 * Make sure obd_zomebie_impexp_thread get this notification.
1641 * It is possible this signal only get by obd_zombie_barrier, and
1642 * barrier gulps this notification and sleeps away and hangs ensues
1644 cfs_waitq_broadcast(&obd_zombie_waitq);
1648 * check whether obd_zombie is idle
1650 static int obd_zombie_is_idle(void)
1654 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1655 cfs_spin_lock(&obd_zombie_impexp_lock);
1656 rc = (zombies_count == 0);
1657 cfs_spin_unlock(&obd_zombie_impexp_lock);
1662 * wait when obd_zombie import/export queues become empty
1664 void obd_zombie_barrier(void)
1666 struct l_wait_info lwi = { 0 };
1668 if (obd_zombie_pid == cfs_curproc_pid())
1669 /* don't wait for myself */
1671 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1673 EXPORT_SYMBOL(obd_zombie_barrier);
1678 * destroy zombie export/import thread.
1680 static int obd_zombie_impexp_thread(void *unused)
1684 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1685 cfs_complete(&obd_zombie_start);
1689 cfs_complete(&obd_zombie_start);
1691 obd_zombie_pid = cfs_curproc_pid();
1693 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1694 struct l_wait_info lwi = { 0 };
1696 l_wait_event(obd_zombie_waitq,
1697 !obd_zombie_impexp_check(NULL), &lwi);
1698 obd_zombie_impexp_cull();
1701 * Notify obd_zombie_barrier callers that queues
1704 cfs_waitq_signal(&obd_zombie_waitq);
1707 cfs_complete(&obd_zombie_stop);
1712 #else /* ! KERNEL */
1714 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1715 static void *obd_zombie_impexp_work_cb;
1716 static void *obd_zombie_impexp_idle_cb;
1718 int obd_zombie_impexp_kill(void *arg)
1722 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1723 obd_zombie_impexp_cull();
1726 cfs_atomic_dec(&zombie_recur);
1733 * start destroy zombie import/export thread
1735 int obd_zombie_impexp_init(void)
1739 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1740 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1741 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1742 cfs_init_completion(&obd_zombie_start);
1743 cfs_init_completion(&obd_zombie_stop);
1744 cfs_waitq_init(&obd_zombie_waitq);
1748 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1752 cfs_wait_for_completion(&obd_zombie_start);
1755 obd_zombie_impexp_work_cb =
1756 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1757 &obd_zombie_impexp_kill, NULL);
1759 obd_zombie_impexp_idle_cb =
1760 liblustre_register_idle_callback("obd_zombi_impexp_check",
1761 &obd_zombie_impexp_check, NULL);
1767 * stop destroy zombie import/export thread
1769 void obd_zombie_impexp_stop(void)
1771 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1772 obd_zombie_impexp_notify();
1774 cfs_wait_for_completion(&obd_zombie_stop);
1776 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1777 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1781 /***** Kernel-userspace comm helpers *******/
1783 /* Get length of entire message, including header */
1784 int kuc_len(int payload_len)
1786 return sizeof(struct kuc_hdr) + payload_len;
1788 EXPORT_SYMBOL(kuc_len);
1790 /* Get a pointer to kuc header, given a ptr to the payload
1791 * @param p Pointer to payload area
1792 * @returns Pointer to kuc header
1794 struct kuc_hdr * kuc_ptr(void *p)
1796 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1797 LASSERT(lh->kuc_magic == KUC_MAGIC);
1800 EXPORT_SYMBOL(kuc_ptr);
1802 /* Test if payload is part of kuc message
1803 * @param p Pointer to payload area
1806 int kuc_ispayload(void *p)
1808 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1810 if (kh->kuc_magic == KUC_MAGIC)
1815 EXPORT_SYMBOL(kuc_ispayload);
1817 /* Alloc space for a message, and fill in header
1818 * @return Pointer to payload area
1820 void *kuc_alloc(int payload_len, int transport, int type)
1823 int len = kuc_len(payload_len);
1827 return ERR_PTR(-ENOMEM);
1829 lh->kuc_magic = KUC_MAGIC;
1830 lh->kuc_transport = transport;
1831 lh->kuc_msgtype = type;
1832 lh->kuc_msglen = len;
1834 return (void *)(lh + 1);
1836 EXPORT_SYMBOL(kuc_alloc);
1838 /* Takes pointer to payload area */
1839 inline void kuc_free(void *p, int payload_len)
1841 struct kuc_hdr *lh = kuc_ptr(p);
1842 OBD_FREE(lh, kuc_len(payload_len));
1844 EXPORT_SYMBOL(kuc_free);