4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 cfs_spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 cfs_spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 cfs_spin_unlock(&obd_types_lock);
112 cfs_spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
124 if (!cfs_request_module("%s", modname)) {
125 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126 type = class_search_type(name);
128 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134 cfs_spin_lock(&type->obd_type_lock);
136 cfs_try_module_get(type->typ_dt_ops->o_owner);
137 cfs_spin_unlock(&type->obd_type_lock);
141 EXPORT_SYMBOL(class_get_type);
143 void class_put_type(struct obd_type *type)
146 cfs_spin_lock(&type->obd_type_lock);
148 cfs_module_put(type->typ_dt_ops->o_owner);
149 cfs_spin_unlock(&type->obd_type_lock);
151 EXPORT_SYMBOL(class_put_type);
153 #define CLASS_MAX_NAME 1024
155 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
156 struct lprocfs_vars *vars, const char *name,
157 struct lu_device_type *ldt)
159 struct obd_type *type;
164 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
166 if (class_search_type(name)) {
167 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
172 OBD_ALLOC(type, sizeof(*type));
176 OBD_ALLOC_PTR(type->typ_dt_ops);
177 OBD_ALLOC_PTR(type->typ_md_ops);
178 OBD_ALLOC(type->typ_name, strlen(name) + 1);
180 if (type->typ_dt_ops == NULL ||
181 type->typ_md_ops == NULL ||
182 type->typ_name == NULL)
185 *(type->typ_dt_ops) = *dt_ops;
186 /* md_ops is optional */
188 *(type->typ_md_ops) = *md_ops;
189 strcpy(type->typ_name, name);
190 cfs_spin_lock_init(&type->obd_type_lock);
193 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
195 if (IS_ERR(type->typ_procroot)) {
196 rc = PTR_ERR(type->typ_procroot);
197 type->typ_procroot = NULL;
203 rc = lu_device_type_init(ldt);
208 cfs_spin_lock(&obd_types_lock);
209 cfs_list_add(&type->typ_chain, &obd_types);
210 cfs_spin_unlock(&obd_types_lock);
215 if (type->typ_name != NULL)
216 OBD_FREE(type->typ_name, strlen(name) + 1);
217 if (type->typ_md_ops != NULL)
218 OBD_FREE_PTR(type->typ_md_ops);
219 if (type->typ_dt_ops != NULL)
220 OBD_FREE_PTR(type->typ_dt_ops);
221 OBD_FREE(type, sizeof(*type));
224 EXPORT_SYMBOL(class_register_type);
226 int class_unregister_type(const char *name)
228 struct obd_type *type = class_search_type(name);
232 CERROR("unknown obd type\n");
236 if (type->typ_refcnt) {
237 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238 /* This is a bad situation, let's make the best of it */
239 /* Remove ops, but leave the name for debugging */
240 OBD_FREE_PTR(type->typ_dt_ops);
241 OBD_FREE_PTR(type->typ_md_ops);
245 /* we do not use type->typ_procroot as for compatibility purposes
246 * other modules can share names (i.e. lod can use lov entry). so
247 * we can't reference pointer as it can get invalided when another
248 * module removes the entry */
249 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
252 lu_device_type_fini(type->typ_lu);
254 cfs_spin_lock(&obd_types_lock);
255 cfs_list_del(&type->typ_chain);
256 cfs_spin_unlock(&obd_types_lock);
257 OBD_FREE(type->typ_name, strlen(name) + 1);
258 if (type->typ_dt_ops != NULL)
259 OBD_FREE_PTR(type->typ_dt_ops);
260 if (type->typ_md_ops != NULL)
261 OBD_FREE_PTR(type->typ_md_ops);
262 OBD_FREE(type, sizeof(*type));
264 } /* class_unregister_type */
265 EXPORT_SYMBOL(class_unregister_type);
268 * Create a new obd device.
270 * Find an empty slot in ::obd_devs[], create a new obd device in it.
272 * \param[in] type_name obd device type string.
273 * \param[in] name obd device name.
275 * \retval NULL if create fails, otherwise return the obd device
278 struct obd_device *class_newdev(const char *type_name, const char *name)
280 struct obd_device *result = NULL;
281 struct obd_device *newdev;
282 struct obd_type *type = NULL;
284 int new_obd_minor = 0;
287 if (strlen(name) >= MAX_OBD_NAME) {
288 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
289 RETURN(ERR_PTR(-EINVAL));
292 type = class_get_type(type_name);
294 CERROR("OBD: unknown type: %s\n", type_name);
295 RETURN(ERR_PTR(-ENODEV));
298 newdev = obd_device_alloc();
299 if (newdev == NULL) {
300 class_put_type(type);
301 RETURN(ERR_PTR(-ENOMEM));
303 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
305 cfs_write_lock(&obd_dev_lock);
306 for (i = 0; i < class_devno_max(); i++) {
307 struct obd_device *obd = class_num2obd(i);
309 if (obd && obd->obd_name &&
310 (strcmp(name, obd->obd_name) == 0)) {
311 CERROR("Device %s already exists at %d, won't add\n",
314 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
315 "%p obd_magic %08x != %08x\n", result,
316 result->obd_magic, OBD_DEVICE_MAGIC);
317 LASSERTF(result->obd_minor == new_obd_minor,
318 "%p obd_minor %d != %d\n", result,
319 result->obd_minor, new_obd_minor);
321 obd_devs[result->obd_minor] = NULL;
322 result->obd_name[0]='\0';
324 result = ERR_PTR(-EEXIST);
327 if (!result && !obd) {
329 result->obd_minor = i;
331 result->obd_type = type;
332 strncpy(result->obd_name, name,
333 sizeof(result->obd_name) - 1);
334 obd_devs[i] = result;
337 cfs_write_unlock(&obd_dev_lock);
339 if (result == NULL && i >= class_devno_max()) {
340 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
342 RETURN(ERR_PTR(-EOVERFLOW));
345 if (IS_ERR(result)) {
346 obd_device_free(newdev);
347 class_put_type(type);
349 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
350 result->obd_name, result);
355 void class_release_dev(struct obd_device *obd)
357 struct obd_type *obd_type = obd->obd_type;
359 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
360 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
361 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
362 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
363 LASSERT(obd_type != NULL);
365 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
366 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
368 cfs_write_lock(&obd_dev_lock);
369 obd_devs[obd->obd_minor] = NULL;
370 cfs_write_unlock(&obd_dev_lock);
371 obd_device_free(obd);
373 class_put_type(obd_type);
376 int class_name2dev(const char *name)
383 cfs_read_lock(&obd_dev_lock);
384 for (i = 0; i < class_devno_max(); i++) {
385 struct obd_device *obd = class_num2obd(i);
387 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
388 /* Make sure we finished attaching before we give
389 out any references */
390 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
391 if (obd->obd_attached) {
392 cfs_read_unlock(&obd_dev_lock);
398 cfs_read_unlock(&obd_dev_lock);
402 EXPORT_SYMBOL(class_name2dev);
404 struct obd_device *class_name2obd(const char *name)
406 int dev = class_name2dev(name);
408 if (dev < 0 || dev > class_devno_max())
410 return class_num2obd(dev);
412 EXPORT_SYMBOL(class_name2obd);
414 int class_uuid2dev(struct obd_uuid *uuid)
418 cfs_read_lock(&obd_dev_lock);
419 for (i = 0; i < class_devno_max(); i++) {
420 struct obd_device *obd = class_num2obd(i);
422 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
423 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
424 cfs_read_unlock(&obd_dev_lock);
428 cfs_read_unlock(&obd_dev_lock);
432 EXPORT_SYMBOL(class_uuid2dev);
434 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
436 int dev = class_uuid2dev(uuid);
439 return class_num2obd(dev);
441 EXPORT_SYMBOL(class_uuid2obd);
444 * Get obd device from ::obd_devs[]
446 * \param num [in] array index
448 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
449 * otherwise return the obd device there.
451 struct obd_device *class_num2obd(int num)
453 struct obd_device *obd = NULL;
455 if (num < class_devno_max()) {
460 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
461 "%p obd_magic %08x != %08x\n",
462 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
463 LASSERTF(obd->obd_minor == num,
464 "%p obd_minor %0d != %0d\n",
465 obd, obd->obd_minor, num);
470 EXPORT_SYMBOL(class_num2obd);
472 void class_obd_list(void)
477 cfs_read_lock(&obd_dev_lock);
478 for (i = 0; i < class_devno_max(); i++) {
479 struct obd_device *obd = class_num2obd(i);
483 if (obd->obd_stopping)
485 else if (obd->obd_set_up)
487 else if (obd->obd_attached)
491 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
492 i, status, obd->obd_type->typ_name,
493 obd->obd_name, obd->obd_uuid.uuid,
494 cfs_atomic_read(&obd->obd_refcount));
496 cfs_read_unlock(&obd_dev_lock);
500 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
501 specified, then only the client with that uuid is returned,
502 otherwise any client connected to the tgt is returned. */
503 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
504 const char * typ_name,
505 struct obd_uuid *grp_uuid)
509 cfs_read_lock(&obd_dev_lock);
510 for (i = 0; i < class_devno_max(); i++) {
511 struct obd_device *obd = class_num2obd(i);
515 if ((strncmp(obd->obd_type->typ_name, typ_name,
516 strlen(typ_name)) == 0)) {
517 if (obd_uuid_equals(tgt_uuid,
518 &obd->u.cli.cl_target_uuid) &&
519 ((grp_uuid)? obd_uuid_equals(grp_uuid,
520 &obd->obd_uuid) : 1)) {
521 cfs_read_unlock(&obd_dev_lock);
526 cfs_read_unlock(&obd_dev_lock);
530 EXPORT_SYMBOL(class_find_client_obd);
532 /* Iterate the obd_device list looking devices have grp_uuid. Start
533 searching at *next, and if a device is found, the next index to look
534 at is saved in *next. If next is NULL, then the first matching device
535 will always be returned. */
536 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
542 else if (*next >= 0 && *next < class_devno_max())
547 cfs_read_lock(&obd_dev_lock);
548 for (; i < class_devno_max(); i++) {
549 struct obd_device *obd = class_num2obd(i);
553 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
556 cfs_read_unlock(&obd_dev_lock);
560 cfs_read_unlock(&obd_dev_lock);
564 EXPORT_SYMBOL(class_devices_in_group);
567 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
568 * adjust sptlrpc settings accordingly.
570 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
572 struct obd_device *obd;
576 LASSERT(namelen > 0);
578 cfs_read_lock(&obd_dev_lock);
579 for (i = 0; i < class_devno_max(); i++) {
580 obd = class_num2obd(i);
582 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
585 /* only notify mdc, osc, mdt, ost */
586 type = obd->obd_type->typ_name;
587 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
588 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
589 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
590 strcmp(type, LUSTRE_OST_NAME) != 0)
593 if (strncmp(obd->obd_name, fsname, namelen))
596 class_incref(obd, __FUNCTION__, obd);
597 cfs_read_unlock(&obd_dev_lock);
598 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
599 sizeof(KEY_SPTLRPC_CONF),
600 KEY_SPTLRPC_CONF, 0, NULL, NULL);
602 class_decref(obd, __FUNCTION__, obd);
603 cfs_read_lock(&obd_dev_lock);
605 cfs_read_unlock(&obd_dev_lock);
608 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
610 void obd_cleanup_caches(void)
615 if (obd_device_cachep) {
616 rc = cfs_mem_cache_destroy(obd_device_cachep);
617 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
618 obd_device_cachep = NULL;
621 rc = cfs_mem_cache_destroy(obdo_cachep);
622 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
626 rc = cfs_mem_cache_destroy(import_cachep);
627 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
628 import_cachep = NULL;
631 rc = cfs_mem_cache_destroy(capa_cachep);
632 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
638 int obd_init_caches(void)
642 LASSERT(obd_device_cachep == NULL);
643 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
644 sizeof(struct obd_device),
646 if (!obd_device_cachep)
649 LASSERT(obdo_cachep == NULL);
650 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
655 LASSERT(import_cachep == NULL);
656 import_cachep = cfs_mem_cache_create("ll_import_cache",
657 sizeof(struct obd_import),
662 LASSERT(capa_cachep == NULL);
663 capa_cachep = cfs_mem_cache_create("capa_cache",
664 sizeof(struct obd_capa), 0, 0);
670 obd_cleanup_caches();
675 /* map connection to client */
676 struct obd_export *class_conn2export(struct lustre_handle *conn)
678 struct obd_export *export;
682 CDEBUG(D_CACHE, "looking for null handle\n");
686 if (conn->cookie == -1) { /* this means assign a new connection */
687 CDEBUG(D_CACHE, "want a new connection\n");
691 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
692 export = class_handle2object(conn->cookie);
695 EXPORT_SYMBOL(class_conn2export);
697 struct obd_device *class_exp2obd(struct obd_export *exp)
703 EXPORT_SYMBOL(class_exp2obd);
705 struct obd_device *class_conn2obd(struct lustre_handle *conn)
707 struct obd_export *export;
708 export = class_conn2export(conn);
710 struct obd_device *obd = export->exp_obd;
711 class_export_put(export);
716 EXPORT_SYMBOL(class_conn2obd);
718 struct obd_import *class_exp2cliimp(struct obd_export *exp)
720 struct obd_device *obd = exp->exp_obd;
723 return obd->u.cli.cl_import;
725 EXPORT_SYMBOL(class_exp2cliimp);
727 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
729 struct obd_device *obd = class_conn2obd(conn);
732 return obd->u.cli.cl_import;
734 EXPORT_SYMBOL(class_conn2cliimp);
736 /* Export management functions */
737 static void class_export_destroy(struct obd_export *exp)
739 struct obd_device *obd = exp->exp_obd;
742 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
743 LASSERT(obd != NULL);
745 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
746 exp->exp_client_uuid.uuid, obd->obd_name);
748 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
749 if (exp->exp_connection)
750 ptlrpc_put_connection_superhack(exp->exp_connection);
752 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
753 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
754 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
755 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
756 obd_destroy_export(exp);
757 class_decref(obd, "export", exp);
759 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
763 static void export_handle_addref(void *export)
765 class_export_get(export);
768 static struct portals_handle_ops export_handle_ops = {
769 .hop_addref = export_handle_addref,
773 struct obd_export *class_export_get(struct obd_export *exp)
775 cfs_atomic_inc(&exp->exp_refcount);
776 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
777 cfs_atomic_read(&exp->exp_refcount));
780 EXPORT_SYMBOL(class_export_get);
782 void class_export_put(struct obd_export *exp)
784 LASSERT(exp != NULL);
785 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
786 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
787 cfs_atomic_read(&exp->exp_refcount) - 1);
789 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
790 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
791 CDEBUG(D_IOCTL, "final put %p/%s\n",
792 exp, exp->exp_client_uuid.uuid);
794 /* release nid stat refererence */
795 lprocfs_exp_cleanup(exp);
797 obd_zombie_export_add(exp);
800 EXPORT_SYMBOL(class_export_put);
802 /* Creates a new export, adds it to the hash table, and returns a
803 * pointer to it. The refcount is 2: one for the hash reference, and
804 * one for the pointer returned by this function. */
805 struct obd_export *class_new_export(struct obd_device *obd,
806 struct obd_uuid *cluuid)
808 struct obd_export *export;
809 cfs_hash_t *hash = NULL;
813 OBD_ALLOC_PTR(export);
815 return ERR_PTR(-ENOMEM);
817 export->exp_conn_cnt = 0;
818 export->exp_lock_hash = NULL;
819 export->exp_flock_hash = NULL;
820 cfs_atomic_set(&export->exp_refcount, 2);
821 cfs_atomic_set(&export->exp_rpc_count, 0);
822 cfs_atomic_set(&export->exp_cb_count, 0);
823 cfs_atomic_set(&export->exp_locks_count, 0);
824 #if LUSTRE_TRACKS_LOCK_EXP_REFS
825 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
826 cfs_spin_lock_init(&export->exp_locks_list_guard);
828 cfs_atomic_set(&export->exp_replay_count, 0);
829 export->exp_obd = obd;
830 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
831 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
832 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
833 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
834 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
835 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
836 class_handle_hash(&export->exp_handle, &export_handle_ops);
837 export->exp_last_request_time = cfs_time_current_sec();
838 cfs_spin_lock_init(&export->exp_lock);
839 cfs_spin_lock_init(&export->exp_rpc_lock);
840 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
841 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
842 cfs_spin_lock_init(&export->exp_bl_list_lock);
843 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
845 export->exp_sp_peer = LUSTRE_SP_ANY;
846 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
847 export->exp_client_uuid = *cluuid;
848 obd_init_export(export);
850 cfs_spin_lock(&obd->obd_dev_lock);
851 /* shouldn't happen, but might race */
852 if (obd->obd_stopping)
853 GOTO(exit_unlock, rc = -ENODEV);
855 hash = cfs_hash_getref(obd->obd_uuid_hash);
857 GOTO(exit_unlock, rc = -ENODEV);
858 cfs_spin_unlock(&obd->obd_dev_lock);
860 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
861 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
863 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
864 obd->obd_name, cluuid->uuid, rc);
865 GOTO(exit_err, rc = -EALREADY);
869 cfs_spin_lock(&obd->obd_dev_lock);
870 if (obd->obd_stopping) {
871 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
872 GOTO(exit_unlock, rc = -ENODEV);
875 class_incref(obd, "export", export);
876 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
877 cfs_list_add_tail(&export->exp_obd_chain_timed,
878 &export->exp_obd->obd_exports_timed);
879 export->exp_obd->obd_num_exports++;
880 cfs_spin_unlock(&obd->obd_dev_lock);
881 cfs_hash_putref(hash);
885 cfs_spin_unlock(&obd->obd_dev_lock);
888 cfs_hash_putref(hash);
889 class_handle_unhash(&export->exp_handle);
890 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
891 obd_destroy_export(export);
892 OBD_FREE_PTR(export);
895 EXPORT_SYMBOL(class_new_export);
897 void class_unlink_export(struct obd_export *exp)
899 class_handle_unhash(&exp->exp_handle);
901 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
902 /* delete an uuid-export hashitem from hashtables */
903 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
904 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
905 &exp->exp_client_uuid,
906 &exp->exp_uuid_hash);
908 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
909 cfs_list_del_init(&exp->exp_obd_chain_timed);
910 exp->exp_obd->obd_num_exports--;
911 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
912 class_export_put(exp);
914 EXPORT_SYMBOL(class_unlink_export);
916 /* Import management functions */
917 void class_import_destroy(struct obd_import *imp)
921 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
922 imp->imp_obd->obd_name);
924 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
926 ptlrpc_put_connection_superhack(imp->imp_connection);
928 while (!cfs_list_empty(&imp->imp_conn_list)) {
929 struct obd_import_conn *imp_conn;
931 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
932 struct obd_import_conn, oic_item);
933 cfs_list_del_init(&imp_conn->oic_item);
934 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
935 OBD_FREE(imp_conn, sizeof(*imp_conn));
938 LASSERT(imp->imp_sec == NULL);
939 class_decref(imp->imp_obd, "import", imp);
940 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
944 static void import_handle_addref(void *import)
946 class_import_get(import);
949 static struct portals_handle_ops import_handle_ops = {
950 .hop_addref = import_handle_addref,
954 struct obd_import *class_import_get(struct obd_import *import)
956 cfs_atomic_inc(&import->imp_refcount);
957 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
958 cfs_atomic_read(&import->imp_refcount),
959 import->imp_obd->obd_name);
962 EXPORT_SYMBOL(class_import_get);
964 void class_import_put(struct obd_import *imp)
968 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
969 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
971 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
972 cfs_atomic_read(&imp->imp_refcount) - 1,
973 imp->imp_obd->obd_name);
975 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
976 CDEBUG(D_INFO, "final put import %p\n", imp);
977 obd_zombie_import_add(imp);
980 /* catch possible import put race */
981 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
984 EXPORT_SYMBOL(class_import_put);
986 static void init_imp_at(struct imp_at *at) {
988 at_init(&at->iat_net_latency, 0, 0);
989 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
990 /* max service estimates are tracked on the server side, so
991 don't use the AT history here, just use the last reported
992 val. (But keep hist for proc histogram, worst_ever) */
993 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
998 struct obd_import *class_new_import(struct obd_device *obd)
1000 struct obd_import *imp;
1002 OBD_ALLOC(imp, sizeof(*imp));
1006 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1007 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1008 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1009 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1010 cfs_spin_lock_init(&imp->imp_lock);
1011 imp->imp_last_success_conn = 0;
1012 imp->imp_state = LUSTRE_IMP_NEW;
1013 imp->imp_obd = class_incref(obd, "import", imp);
1014 cfs_mutex_init(&imp->imp_sec_mutex);
1015 cfs_waitq_init(&imp->imp_recovery_waitq);
1017 cfs_atomic_set(&imp->imp_refcount, 2);
1018 cfs_atomic_set(&imp->imp_unregistering, 0);
1019 cfs_atomic_set(&imp->imp_inflight, 0);
1020 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1021 cfs_atomic_set(&imp->imp_inval_count, 0);
1022 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1023 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1024 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1025 init_imp_at(&imp->imp_at);
1027 /* the default magic is V2, will be used in connect RPC, and
1028 * then adjusted according to the flags in request/reply. */
1029 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1033 EXPORT_SYMBOL(class_new_import);
1035 void class_destroy_import(struct obd_import *import)
1037 LASSERT(import != NULL);
1038 LASSERT(import != LP_POISON);
1040 class_handle_unhash(&import->imp_handle);
1042 cfs_spin_lock(&import->imp_lock);
1043 import->imp_generation++;
1044 cfs_spin_unlock(&import->imp_lock);
1045 class_import_put(import);
1047 EXPORT_SYMBOL(class_destroy_import);
1049 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1051 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1053 cfs_spin_lock(&exp->exp_locks_list_guard);
1055 LASSERT(lock->l_exp_refs_nr >= 0);
1057 if (lock->l_exp_refs_target != NULL &&
1058 lock->l_exp_refs_target != exp) {
1059 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1060 exp, lock, lock->l_exp_refs_target);
1062 if ((lock->l_exp_refs_nr ++) == 0) {
1063 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1064 lock->l_exp_refs_target = exp;
1066 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1067 lock, exp, lock->l_exp_refs_nr);
1068 cfs_spin_unlock(&exp->exp_locks_list_guard);
1070 EXPORT_SYMBOL(__class_export_add_lock_ref);
1072 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1074 cfs_spin_lock(&exp->exp_locks_list_guard);
1075 LASSERT(lock->l_exp_refs_nr > 0);
1076 if (lock->l_exp_refs_target != exp) {
1077 LCONSOLE_WARN("lock %p, "
1078 "mismatching export pointers: %p, %p\n",
1079 lock, lock->l_exp_refs_target, exp);
1081 if (-- lock->l_exp_refs_nr == 0) {
1082 cfs_list_del_init(&lock->l_exp_refs_link);
1083 lock->l_exp_refs_target = NULL;
1085 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1086 lock, exp, lock->l_exp_refs_nr);
1087 cfs_spin_unlock(&exp->exp_locks_list_guard);
1089 EXPORT_SYMBOL(__class_export_del_lock_ref);
1092 /* A connection defines an export context in which preallocation can
1093 be managed. This releases the export pointer reference, and returns
1094 the export handle, so the export refcount is 1 when this function
1096 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1097 struct obd_uuid *cluuid)
1099 struct obd_export *export;
1100 LASSERT(conn != NULL);
1101 LASSERT(obd != NULL);
1102 LASSERT(cluuid != NULL);
1105 export = class_new_export(obd, cluuid);
1107 RETURN(PTR_ERR(export));
1109 conn->cookie = export->exp_handle.h_cookie;
1110 class_export_put(export);
1112 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1113 cluuid->uuid, conn->cookie);
1116 EXPORT_SYMBOL(class_connect);
1118 /* if export is involved in recovery then clean up related things */
1119 void class_export_recovery_cleanup(struct obd_export *exp)
1121 struct obd_device *obd = exp->exp_obd;
1123 cfs_spin_lock(&obd->obd_recovery_task_lock);
1124 if (exp->exp_delayed)
1125 obd->obd_delayed_clients--;
1126 if (obd->obd_recovering && exp->exp_in_recovery) {
1127 cfs_spin_lock(&exp->exp_lock);
1128 exp->exp_in_recovery = 0;
1129 cfs_spin_unlock(&exp->exp_lock);
1130 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1131 cfs_atomic_dec(&obd->obd_connected_clients);
1133 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1134 /** Cleanup req replay fields */
1135 if (exp->exp_req_replay_needed) {
1136 cfs_spin_lock(&exp->exp_lock);
1137 exp->exp_req_replay_needed = 0;
1138 cfs_spin_unlock(&exp->exp_lock);
1139 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1140 cfs_atomic_dec(&obd->obd_req_replay_clients);
1142 /** Cleanup lock replay data */
1143 if (exp->exp_lock_replay_needed) {
1144 cfs_spin_lock(&exp->exp_lock);
1145 exp->exp_lock_replay_needed = 0;
1146 cfs_spin_unlock(&exp->exp_lock);
1147 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1148 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1152 /* This function removes 1-3 references from the export:
1153 * 1 - for export pointer passed
1154 * and if disconnect really need
1155 * 2 - removing from hash
1156 * 3 - in client_unlink_export
1157 * The export pointer passed to this function can destroyed */
1158 int class_disconnect(struct obd_export *export)
1160 int already_disconnected;
1163 if (export == NULL) {
1164 CWARN("attempting to free NULL export %p\n", export);
1168 cfs_spin_lock(&export->exp_lock);
1169 already_disconnected = export->exp_disconnected;
1170 export->exp_disconnected = 1;
1171 cfs_spin_unlock(&export->exp_lock);
1173 /* class_cleanup(), abort_recovery(), and class_fail_export()
1174 * all end up in here, and if any of them race we shouldn't
1175 * call extra class_export_puts(). */
1176 if (already_disconnected) {
1177 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1178 GOTO(no_disconn, already_disconnected);
1181 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1182 export->exp_handle.h_cookie);
1184 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1185 cfs_hash_del(export->exp_obd->obd_nid_hash,
1186 &export->exp_connection->c_peer.nid,
1187 &export->exp_nid_hash);
1189 class_export_recovery_cleanup(export);
1190 class_unlink_export(export);
1192 class_export_put(export);
1195 EXPORT_SYMBOL(class_disconnect);
1197 /* Return non-zero for a fully connected export */
1198 int class_connected_export(struct obd_export *exp)
1202 cfs_spin_lock(&exp->exp_lock);
1203 connected = (exp->exp_conn_cnt > 0);
1204 cfs_spin_unlock(&exp->exp_lock);
1209 EXPORT_SYMBOL(class_connected_export);
1211 static void class_disconnect_export_list(cfs_list_t *list,
1212 enum obd_option flags)
1215 struct obd_export *exp;
1218 /* It's possible that an export may disconnect itself, but
1219 * nothing else will be added to this list. */
1220 while (!cfs_list_empty(list)) {
1221 exp = cfs_list_entry(list->next, struct obd_export,
1223 /* need for safe call CDEBUG after obd_disconnect */
1224 class_export_get(exp);
1226 cfs_spin_lock(&exp->exp_lock);
1227 exp->exp_flags = flags;
1228 cfs_spin_unlock(&exp->exp_lock);
1230 if (obd_uuid_equals(&exp->exp_client_uuid,
1231 &exp->exp_obd->obd_uuid)) {
1233 "exp %p export uuid == obd uuid, don't discon\n",
1235 /* Need to delete this now so we don't end up pointing
1236 * to work_list later when this export is cleaned up. */
1237 cfs_list_del_init(&exp->exp_obd_chain);
1238 class_export_put(exp);
1242 class_export_get(exp);
1243 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1244 "last request at "CFS_TIME_T"\n",
1245 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1246 exp, exp->exp_last_request_time);
1247 /* release one export reference anyway */
1248 rc = obd_disconnect(exp);
1250 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1251 obd_export_nid2str(exp), exp, rc);
1252 class_export_put(exp);
1257 void class_disconnect_exports(struct obd_device *obd)
1259 cfs_list_t work_list;
1262 /* Move all of the exports from obd_exports to a work list, en masse. */
1263 CFS_INIT_LIST_HEAD(&work_list);
1264 cfs_spin_lock(&obd->obd_dev_lock);
1265 cfs_list_splice_init(&obd->obd_exports, &work_list);
1266 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1267 cfs_spin_unlock(&obd->obd_dev_lock);
1269 if (!cfs_list_empty(&work_list)) {
1270 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1271 "disconnecting them\n", obd->obd_minor, obd);
1272 class_disconnect_export_list(&work_list,
1273 exp_flags_from_obd(obd));
1275 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1276 obd->obd_minor, obd);
1279 EXPORT_SYMBOL(class_disconnect_exports);
1281 /* Remove exports that have not completed recovery.
1283 void class_disconnect_stale_exports(struct obd_device *obd,
1284 int (*test_export)(struct obd_export *))
1286 cfs_list_t work_list;
1287 struct obd_export *exp, *n;
1291 CFS_INIT_LIST_HEAD(&work_list);
1292 cfs_spin_lock(&obd->obd_dev_lock);
1293 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1295 /* don't count self-export as client */
1296 if (obd_uuid_equals(&exp->exp_client_uuid,
1297 &exp->exp_obd->obd_uuid))
1300 /* don't evict clients which have no slot in last_rcvd
1301 * (e.g. lightweight connection) */
1302 if (exp->exp_target_data.ted_lr_idx == -1)
1305 cfs_spin_lock(&exp->exp_lock);
1306 if (test_export(exp)) {
1307 cfs_spin_unlock(&exp->exp_lock);
1310 exp->exp_failed = 1;
1311 cfs_spin_unlock(&exp->exp_lock);
1313 cfs_list_move(&exp->exp_obd_chain, &work_list);
1315 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1316 obd->obd_name, exp->exp_client_uuid.uuid,
1317 exp->exp_connection == NULL ? "<unknown>" :
1318 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1319 print_export_data(exp, "EVICTING", 0);
1321 cfs_spin_unlock(&obd->obd_dev_lock);
1324 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1325 obd->obd_name, evicted);
1326 obd->obd_stale_clients += evicted;
1328 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1329 OBD_OPT_ABORT_RECOV);
1332 EXPORT_SYMBOL(class_disconnect_stale_exports);
1334 void class_fail_export(struct obd_export *exp)
1336 int rc, already_failed;
1338 cfs_spin_lock(&exp->exp_lock);
1339 already_failed = exp->exp_failed;
1340 exp->exp_failed = 1;
1341 cfs_spin_unlock(&exp->exp_lock);
1343 if (already_failed) {
1344 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1345 exp, exp->exp_client_uuid.uuid);
1349 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1350 exp, exp->exp_client_uuid.uuid);
1352 if (obd_dump_on_timeout)
1353 libcfs_debug_dumplog();
1355 /* need for safe call CDEBUG after obd_disconnect */
1356 class_export_get(exp);
1358 /* Most callers into obd_disconnect are removing their own reference
1359 * (request, for example) in addition to the one from the hash table.
1360 * We don't have such a reference here, so make one. */
1361 class_export_get(exp);
1362 rc = obd_disconnect(exp);
1364 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1366 CDEBUG(D_HA, "disconnected export %p/%s\n",
1367 exp, exp->exp_client_uuid.uuid);
1368 class_export_put(exp);
1370 EXPORT_SYMBOL(class_fail_export);
1372 char *obd_export_nid2str(struct obd_export *exp)
1374 if (exp->exp_connection != NULL)
1375 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1379 EXPORT_SYMBOL(obd_export_nid2str);
1381 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1383 struct obd_export *doomed_exp = NULL;
1384 int exports_evicted = 0;
1386 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1389 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1390 if (doomed_exp == NULL)
1393 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1394 "nid %s found, wanted nid %s, requested nid %s\n",
1395 obd_export_nid2str(doomed_exp),
1396 libcfs_nid2str(nid_key), nid);
1397 LASSERTF(doomed_exp != obd->obd_self_export,
1398 "self-export is hashed by NID?\n");
1400 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1401 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1403 class_fail_export(doomed_exp);
1404 class_export_put(doomed_exp);
1407 if (!exports_evicted)
1408 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1409 obd->obd_name, nid);
1410 return exports_evicted;
1412 EXPORT_SYMBOL(obd_export_evict_by_nid);
1414 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1416 struct obd_export *doomed_exp = NULL;
1417 struct obd_uuid doomed_uuid;
1418 int exports_evicted = 0;
1420 obd_str2uuid(&doomed_uuid, uuid);
1421 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1422 CERROR("%s: can't evict myself\n", obd->obd_name);
1423 return exports_evicted;
1426 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1428 if (doomed_exp == NULL) {
1429 CERROR("%s: can't disconnect %s: no exports found\n",
1430 obd->obd_name, uuid);
1432 CWARN("%s: evicting %s at adminstrative request\n",
1433 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1434 class_fail_export(doomed_exp);
1435 class_export_put(doomed_exp);
1439 return exports_evicted;
1441 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1443 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1444 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1445 EXPORT_SYMBOL(class_export_dump_hook);
1448 static void print_export_data(struct obd_export *exp, const char *status,
1451 struct ptlrpc_reply_state *rs;
1452 struct ptlrpc_reply_state *first_reply = NULL;
1455 cfs_spin_lock(&exp->exp_lock);
1456 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1462 cfs_spin_unlock(&exp->exp_lock);
1464 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1465 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1466 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1467 cfs_atomic_read(&exp->exp_rpc_count),
1468 cfs_atomic_read(&exp->exp_cb_count),
1469 cfs_atomic_read(&exp->exp_locks_count),
1470 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1471 nreplies, first_reply, nreplies > 3 ? "..." : "",
1472 exp->exp_last_committed);
1473 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1474 if (locks && class_export_dump_hook != NULL)
1475 class_export_dump_hook(exp);
1479 void dump_exports(struct obd_device *obd, int locks)
1481 struct obd_export *exp;
1483 cfs_spin_lock(&obd->obd_dev_lock);
1484 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1485 print_export_data(exp, "ACTIVE", locks);
1486 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1487 print_export_data(exp, "UNLINKED", locks);
1488 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1489 print_export_data(exp, "DELAYED", locks);
1490 cfs_spin_unlock(&obd->obd_dev_lock);
1491 cfs_spin_lock(&obd_zombie_impexp_lock);
1492 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1493 print_export_data(exp, "ZOMBIE", locks);
1494 cfs_spin_unlock(&obd_zombie_impexp_lock);
1496 EXPORT_SYMBOL(dump_exports);
1498 void obd_exports_barrier(struct obd_device *obd)
1501 LASSERT(cfs_list_empty(&obd->obd_exports));
1502 cfs_spin_lock(&obd->obd_dev_lock);
1503 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1504 cfs_spin_unlock(&obd->obd_dev_lock);
1505 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1506 cfs_time_seconds(waited));
1507 if (waited > 5 && IS_PO2(waited)) {
1508 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1509 "more than %d seconds. "
1510 "The obd refcount = %d. Is it stuck?\n",
1511 obd->obd_name, waited,
1512 cfs_atomic_read(&obd->obd_refcount));
1513 dump_exports(obd, 1);
1516 cfs_spin_lock(&obd->obd_dev_lock);
1518 cfs_spin_unlock(&obd->obd_dev_lock);
1520 EXPORT_SYMBOL(obd_exports_barrier);
1522 /* Total amount of zombies to be destroyed */
1523 static int zombies_count = 0;
1526 * kill zombie imports and exports
1528 void obd_zombie_impexp_cull(void)
1530 struct obd_import *import;
1531 struct obd_export *export;
1535 cfs_spin_lock(&obd_zombie_impexp_lock);
1538 if (!cfs_list_empty(&obd_zombie_imports)) {
1539 import = cfs_list_entry(obd_zombie_imports.next,
1542 cfs_list_del_init(&import->imp_zombie_chain);
1546 if (!cfs_list_empty(&obd_zombie_exports)) {
1547 export = cfs_list_entry(obd_zombie_exports.next,
1550 cfs_list_del_init(&export->exp_obd_chain);
1553 cfs_spin_unlock(&obd_zombie_impexp_lock);
1555 if (import != NULL) {
1556 class_import_destroy(import);
1557 cfs_spin_lock(&obd_zombie_impexp_lock);
1559 cfs_spin_unlock(&obd_zombie_impexp_lock);
1562 if (export != NULL) {
1563 class_export_destroy(export);
1564 cfs_spin_lock(&obd_zombie_impexp_lock);
1566 cfs_spin_unlock(&obd_zombie_impexp_lock);
1570 } while (import != NULL || export != NULL);
1574 static cfs_completion_t obd_zombie_start;
1575 static cfs_completion_t obd_zombie_stop;
1576 static unsigned long obd_zombie_flags;
1577 static cfs_waitq_t obd_zombie_waitq;
1578 static pid_t obd_zombie_pid;
1581 OBD_ZOMBIE_STOP = 1 << 1
1585 * check for work for kill zombie import/export thread.
1587 static int obd_zombie_impexp_check(void *arg)
1591 cfs_spin_lock(&obd_zombie_impexp_lock);
1592 rc = (zombies_count == 0) &&
1593 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1594 cfs_spin_unlock(&obd_zombie_impexp_lock);
1600 * Add export to the obd_zombe thread and notify it.
1602 static void obd_zombie_export_add(struct obd_export *exp) {
1603 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1604 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1605 cfs_list_del_init(&exp->exp_obd_chain);
1606 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1607 cfs_spin_lock(&obd_zombie_impexp_lock);
1609 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1610 cfs_spin_unlock(&obd_zombie_impexp_lock);
1612 obd_zombie_impexp_notify();
1616 * Add import to the obd_zombe thread and notify it.
1618 static void obd_zombie_import_add(struct obd_import *imp) {
1619 LASSERT(imp->imp_sec == NULL);
1620 LASSERT(imp->imp_rq_pool == NULL);
1621 cfs_spin_lock(&obd_zombie_impexp_lock);
1622 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1624 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1625 cfs_spin_unlock(&obd_zombie_impexp_lock);
1627 obd_zombie_impexp_notify();
1631 * notify import/export destroy thread about new zombie.
1633 static void obd_zombie_impexp_notify(void)
1636 * Make sure obd_zomebie_impexp_thread get this notification.
1637 * It is possible this signal only get by obd_zombie_barrier, and
1638 * barrier gulps this notification and sleeps away and hangs ensues
1640 cfs_waitq_broadcast(&obd_zombie_waitq);
1644 * check whether obd_zombie is idle
1646 static int obd_zombie_is_idle(void)
1650 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1651 cfs_spin_lock(&obd_zombie_impexp_lock);
1652 rc = (zombies_count == 0);
1653 cfs_spin_unlock(&obd_zombie_impexp_lock);
1658 * wait when obd_zombie import/export queues become empty
1660 void obd_zombie_barrier(void)
1662 struct l_wait_info lwi = { 0 };
1664 if (obd_zombie_pid == cfs_curproc_pid())
1665 /* don't wait for myself */
1667 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1669 EXPORT_SYMBOL(obd_zombie_barrier);
1674 * destroy zombie export/import thread.
1676 static int obd_zombie_impexp_thread(void *unused)
1680 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1681 cfs_complete(&obd_zombie_start);
1685 cfs_complete(&obd_zombie_start);
1687 obd_zombie_pid = cfs_curproc_pid();
1689 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1690 struct l_wait_info lwi = { 0 };
1692 l_wait_event(obd_zombie_waitq,
1693 !obd_zombie_impexp_check(NULL), &lwi);
1694 obd_zombie_impexp_cull();
1697 * Notify obd_zombie_barrier callers that queues
1700 cfs_waitq_signal(&obd_zombie_waitq);
1703 cfs_complete(&obd_zombie_stop);
1708 #else /* ! KERNEL */
1710 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1711 static void *obd_zombie_impexp_work_cb;
1712 static void *obd_zombie_impexp_idle_cb;
1714 int obd_zombie_impexp_kill(void *arg)
1718 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1719 obd_zombie_impexp_cull();
1722 cfs_atomic_dec(&zombie_recur);
1729 * start destroy zombie import/export thread
1731 int obd_zombie_impexp_init(void)
1735 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1736 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1737 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1738 cfs_init_completion(&obd_zombie_start);
1739 cfs_init_completion(&obd_zombie_stop);
1740 cfs_waitq_init(&obd_zombie_waitq);
1744 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1748 cfs_wait_for_completion(&obd_zombie_start);
1751 obd_zombie_impexp_work_cb =
1752 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1753 &obd_zombie_impexp_kill, NULL);
1755 obd_zombie_impexp_idle_cb =
1756 liblustre_register_idle_callback("obd_zombi_impexp_check",
1757 &obd_zombie_impexp_check, NULL);
1763 * stop destroy zombie import/export thread
1765 void obd_zombie_impexp_stop(void)
1767 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1768 obd_zombie_impexp_notify();
1770 cfs_wait_for_completion(&obd_zombie_stop);
1772 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1773 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1777 /***** Kernel-userspace comm helpers *******/
1779 /* Get length of entire message, including header */
1780 int kuc_len(int payload_len)
1782 return sizeof(struct kuc_hdr) + payload_len;
1784 EXPORT_SYMBOL(kuc_len);
1786 /* Get a pointer to kuc header, given a ptr to the payload
1787 * @param p Pointer to payload area
1788 * @returns Pointer to kuc header
1790 struct kuc_hdr * kuc_ptr(void *p)
1792 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1793 LASSERT(lh->kuc_magic == KUC_MAGIC);
1796 EXPORT_SYMBOL(kuc_ptr);
1798 /* Test if payload is part of kuc message
1799 * @param p Pointer to payload area
1802 int kuc_ispayload(void *p)
1804 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1806 if (kh->kuc_magic == KUC_MAGIC)
1811 EXPORT_SYMBOL(kuc_ispayload);
1813 /* Alloc space for a message, and fill in header
1814 * @return Pointer to payload area
1816 void *kuc_alloc(int payload_len, int transport, int type)
1819 int len = kuc_len(payload_len);
1823 return ERR_PTR(-ENOMEM);
1825 lh->kuc_magic = KUC_MAGIC;
1826 lh->kuc_transport = transport;
1827 lh->kuc_msgtype = type;
1828 lh->kuc_msglen = len;
1830 return (void *)(lh + 1);
1832 EXPORT_SYMBOL(kuc_alloc);
1834 /* Takes pointer to payload area */
1835 inline void kuc_free(void *p, int payload_len)
1837 struct kuc_hdr *lh = kuc_ptr(p);
1838 OBD_FREE(lh, kuc_len(payload_len));
1840 EXPORT_SYMBOL(kuc_free);