1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/obdclass/genops.c
41 * These are the only exported functions, they provide some generic
42 * infrastructure for managing object devices
45 #define DEBUG_SUBSYSTEM S_CLASS
47 #include <liblustre.h>
50 #include <obd_class.h>
51 #include <lprocfs_status.h>
53 extern cfs_list_t obd_types;
54 cfs_spinlock_t obd_types_lock;
56 cfs_mem_cache_t *obd_device_cachep;
57 cfs_mem_cache_t *obdo_cachep;
58 EXPORT_SYMBOL(obdo_cachep);
59 cfs_mem_cache_t *import_cachep;
61 cfs_list_t obd_zombie_imports;
62 cfs_list_t obd_zombie_exports;
63 cfs_spinlock_t obd_zombie_impexp_lock;
64 static void obd_zombie_impexp_notify(void);
65 static void obd_zombie_export_add(struct obd_export *exp);
66 static void obd_zombie_import_add(struct obd_import *imp);
67 static void print_export_data(struct obd_export *exp,
68 const char *status, int locks);
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 * support functions: we could use inter-module communication, but this
74 * is more portable to other OS's
76 static struct obd_device *obd_device_alloc(void)
78 struct obd_device *obd;
80 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
82 obd->obd_magic = OBD_DEVICE_MAGIC;
87 static void obd_device_free(struct obd_device *obd)
90 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92 if (obd->obd_namespace != NULL) {
93 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94 obd, obd->obd_namespace, obd->obd_force);
97 lu_ref_fini(&obd->obd_reference);
98 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 struct obd_type *class_search_type(const char *name)
104 struct obd_type *type;
106 cfs_spin_lock(&obd_types_lock);
107 cfs_list_for_each(tmp, &obd_types) {
108 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
109 if (strcmp(type->typ_name, name) == 0) {
110 cfs_spin_unlock(&obd_types_lock);
114 cfs_spin_unlock(&obd_types_lock);
118 struct obd_type *class_get_type(const char *name)
120 struct obd_type *type = class_search_type(name);
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
124 const char *modname = name;
125 if (!cfs_request_module("%s", modname)) {
126 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127 type = class_search_type(name);
129 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135 cfs_spin_lock(&type->obd_type_lock);
137 cfs_try_module_get(type->typ_dt_ops->o_owner);
138 cfs_spin_unlock(&type->obd_type_lock);
143 void class_put_type(struct obd_type *type)
146 cfs_spin_lock(&type->obd_type_lock);
148 cfs_module_put(type->typ_dt_ops->o_owner);
149 cfs_spin_unlock(&type->obd_type_lock);
152 #define CLASS_MAX_NAME 1024
154 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
155 struct lprocfs_vars *vars, const char *name,
156 struct lu_device_type *ldt)
158 struct obd_type *type;
163 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
165 if (class_search_type(name)) {
166 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
171 OBD_ALLOC(type, sizeof(*type));
175 OBD_ALLOC_PTR(type->typ_dt_ops);
176 OBD_ALLOC_PTR(type->typ_md_ops);
177 OBD_ALLOC(type->typ_name, strlen(name) + 1);
179 if (type->typ_dt_ops == NULL ||
180 type->typ_md_ops == NULL ||
181 type->typ_name == NULL)
184 *(type->typ_dt_ops) = *dt_ops;
185 /* md_ops is optional */
187 *(type->typ_md_ops) = *md_ops;
188 strcpy(type->typ_name, name);
189 cfs_spin_lock_init(&type->obd_type_lock);
192 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
194 if (IS_ERR(type->typ_procroot)) {
195 rc = PTR_ERR(type->typ_procroot);
196 type->typ_procroot = NULL;
202 rc = lu_device_type_init(ldt);
207 cfs_spin_lock(&obd_types_lock);
208 cfs_list_add(&type->typ_chain, &obd_types);
209 cfs_spin_unlock(&obd_types_lock);
214 if (type->typ_name != NULL)
215 OBD_FREE(type->typ_name, strlen(name) + 1);
216 if (type->typ_md_ops != NULL)
217 OBD_FREE_PTR(type->typ_md_ops);
218 if (type->typ_dt_ops != NULL)
219 OBD_FREE_PTR(type->typ_dt_ops);
220 OBD_FREE(type, sizeof(*type));
224 int class_unregister_type(const char *name)
226 struct obd_type *type = class_search_type(name);
230 CERROR("unknown obd type\n");
234 if (type->typ_refcnt) {
235 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
236 /* This is a bad situation, let's make the best of it */
237 /* Remove ops, but leave the name for debugging */
238 OBD_FREE_PTR(type->typ_dt_ops);
239 OBD_FREE_PTR(type->typ_md_ops);
243 if (type->typ_procroot) {
244 lprocfs_remove(&type->typ_procroot);
248 lu_device_type_fini(type->typ_lu);
250 cfs_spin_lock(&obd_types_lock);
251 cfs_list_del(&type->typ_chain);
252 cfs_spin_unlock(&obd_types_lock);
253 OBD_FREE(type->typ_name, strlen(name) + 1);
254 if (type->typ_dt_ops != NULL)
255 OBD_FREE_PTR(type->typ_dt_ops);
256 if (type->typ_md_ops != NULL)
257 OBD_FREE_PTR(type->typ_md_ops);
258 OBD_FREE(type, sizeof(*type));
260 } /* class_unregister_type */
263 * Create a new obd device.
265 * Find an empty slot in ::obd_devs[], create a new obd device in it.
267 * \param[in] type_name obd device type string.
268 * \param[in] name obd device name.
270 * \retval NULL if create fails, otherwise return the obd device
273 struct obd_device *class_newdev(const char *type_name, const char *name)
275 struct obd_device *result = NULL;
276 struct obd_device *newdev;
277 struct obd_type *type = NULL;
279 int new_obd_minor = 0;
281 if (strlen(name) >= MAX_OBD_NAME) {
282 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
283 RETURN(ERR_PTR(-EINVAL));
286 type = class_get_type(type_name);
288 CERROR("OBD: unknown type: %s\n", type_name);
289 RETURN(ERR_PTR(-ENODEV));
292 newdev = obd_device_alloc();
293 if (newdev == NULL) {
294 class_put_type(type);
295 RETURN(ERR_PTR(-ENOMEM));
297 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
299 cfs_spin_lock(&obd_dev_lock);
300 for (i = 0; i < class_devno_max(); i++) {
301 struct obd_device *obd = class_num2obd(i);
302 if (obd && obd->obd_name &&
303 (strcmp(name, obd->obd_name) == 0)) {
304 CERROR("Device %s already exists, won't add\n", name);
306 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
307 "%p obd_magic %08x != %08x\n", result,
308 result->obd_magic, OBD_DEVICE_MAGIC);
309 LASSERTF(result->obd_minor == new_obd_minor,
310 "%p obd_minor %d != %d\n", result,
311 result->obd_minor, new_obd_minor);
313 obd_devs[result->obd_minor] = NULL;
314 result->obd_name[0]='\0';
316 result = ERR_PTR(-EEXIST);
319 if (!result && !obd) {
321 result->obd_minor = i;
323 result->obd_type = type;
324 strncpy(result->obd_name, name,
325 sizeof(result->obd_name) - 1);
326 obd_devs[i] = result;
329 cfs_spin_unlock(&obd_dev_lock);
331 if (result == NULL && i >= class_devno_max()) {
332 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
334 result = ERR_PTR(-EOVERFLOW);
337 if (IS_ERR(result)) {
338 obd_device_free(newdev);
339 class_put_type(type);
341 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
342 result->obd_name, result);
347 void class_release_dev(struct obd_device *obd)
349 struct obd_type *obd_type = obd->obd_type;
351 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
352 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
353 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
354 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
355 LASSERT(obd_type != NULL);
357 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
358 obd->obd_name,obd->obd_type->typ_name);
360 cfs_spin_lock(&obd_dev_lock);
361 obd_devs[obd->obd_minor] = NULL;
362 cfs_spin_unlock(&obd_dev_lock);
363 obd_device_free(obd);
365 class_put_type(obd_type);
368 int class_name2dev(const char *name)
375 cfs_spin_lock(&obd_dev_lock);
376 for (i = 0; i < class_devno_max(); i++) {
377 struct obd_device *obd = class_num2obd(i);
378 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
379 /* Make sure we finished attaching before we give
380 out any references */
381 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
382 if (obd->obd_attached) {
383 cfs_spin_unlock(&obd_dev_lock);
389 cfs_spin_unlock(&obd_dev_lock);
394 struct obd_device *class_name2obd(const char *name)
396 int dev = class_name2dev(name);
398 if (dev < 0 || dev > class_devno_max())
400 return class_num2obd(dev);
403 int class_uuid2dev(struct obd_uuid *uuid)
407 cfs_spin_lock(&obd_dev_lock);
408 for (i = 0; i < class_devno_max(); i++) {
409 struct obd_device *obd = class_num2obd(i);
410 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
411 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
412 cfs_spin_unlock(&obd_dev_lock);
416 cfs_spin_unlock(&obd_dev_lock);
421 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
423 int dev = class_uuid2dev(uuid);
426 return class_num2obd(dev);
430 * Get obd device from ::obd_devs[]
432 * \param num [in] array index
434 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
435 * otherwise return the obd device there.
437 struct obd_device *class_num2obd(int num)
439 struct obd_device *obd = NULL;
441 if (num < class_devno_max()) {
446 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
447 "%p obd_magic %08x != %08x\n",
448 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
449 LASSERTF(obd->obd_minor == num,
450 "%p obd_minor %0d != %0d\n",
451 obd, obd->obd_minor, num);
457 void class_obd_list(void)
462 cfs_spin_lock(&obd_dev_lock);
463 for (i = 0; i < class_devno_max(); i++) {
464 struct obd_device *obd = class_num2obd(i);
467 if (obd->obd_stopping)
469 else if (obd->obd_set_up)
471 else if (obd->obd_attached)
475 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
476 i, status, obd->obd_type->typ_name,
477 obd->obd_name, obd->obd_uuid.uuid,
478 cfs_atomic_read(&obd->obd_refcount));
480 cfs_spin_unlock(&obd_dev_lock);
484 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
485 specified, then only the client with that uuid is returned,
486 otherwise any client connected to the tgt is returned. */
487 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
488 const char * typ_name,
489 struct obd_uuid *grp_uuid)
493 cfs_spin_lock(&obd_dev_lock);
494 for (i = 0; i < class_devno_max(); i++) {
495 struct obd_device *obd = class_num2obd(i);
498 if ((strncmp(obd->obd_type->typ_name, typ_name,
499 strlen(typ_name)) == 0)) {
500 if (obd_uuid_equals(tgt_uuid,
501 &obd->u.cli.cl_target_uuid) &&
502 ((grp_uuid)? obd_uuid_equals(grp_uuid,
503 &obd->obd_uuid) : 1)) {
504 cfs_spin_unlock(&obd_dev_lock);
509 cfs_spin_unlock(&obd_dev_lock);
514 /* Iterate the obd_device list looking devices have grp_uuid. Start
515 searching at *next, and if a device is found, the next index to look
516 at is saved in *next. If next is NULL, then the first matching device
517 will always be returned. */
518 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
524 else if (*next >= 0 && *next < class_devno_max())
529 cfs_spin_lock(&obd_dev_lock);
530 for (; i < class_devno_max(); i++) {
531 struct obd_device *obd = class_num2obd(i);
534 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
537 cfs_spin_unlock(&obd_dev_lock);
541 cfs_spin_unlock(&obd_dev_lock);
547 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
548 * adjust sptlrpc settings accordingly.
550 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
552 struct obd_device *obd;
556 LASSERT(namelen > 0);
558 cfs_spin_lock(&obd_dev_lock);
559 for (i = 0; i < class_devno_max(); i++) {
560 obd = class_num2obd(i);
562 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
565 /* only notify mdc, osc, mdt, ost */
566 type = obd->obd_type->typ_name;
567 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
568 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
569 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
570 strcmp(type, LUSTRE_OST_NAME) != 0)
573 if (strncmp(obd->obd_name, fsname, namelen))
576 class_incref(obd, __FUNCTION__, obd);
577 cfs_spin_unlock(&obd_dev_lock);
578 rc2 = obd_set_info_async(obd->obd_self_export,
579 sizeof(KEY_SPTLRPC_CONF),
580 KEY_SPTLRPC_CONF, 0, NULL, NULL);
582 class_decref(obd, __FUNCTION__, obd);
583 cfs_spin_lock(&obd_dev_lock);
585 cfs_spin_unlock(&obd_dev_lock);
588 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
590 void obd_cleanup_caches(void)
595 if (obd_device_cachep) {
596 rc = cfs_mem_cache_destroy(obd_device_cachep);
597 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
598 obd_device_cachep = NULL;
601 rc = cfs_mem_cache_destroy(obdo_cachep);
602 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
606 rc = cfs_mem_cache_destroy(import_cachep);
607 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
608 import_cachep = NULL;
611 rc = cfs_mem_cache_destroy(capa_cachep);
612 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
618 int obd_init_caches(void)
622 LASSERT(obd_device_cachep == NULL);
623 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
624 sizeof(struct obd_device),
626 if (!obd_device_cachep)
629 LASSERT(obdo_cachep == NULL);
630 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
635 LASSERT(import_cachep == NULL);
636 import_cachep = cfs_mem_cache_create("ll_import_cache",
637 sizeof(struct obd_import),
642 LASSERT(capa_cachep == NULL);
643 capa_cachep = cfs_mem_cache_create("capa_cache",
644 sizeof(struct obd_capa), 0, 0);
650 obd_cleanup_caches();
655 /* map connection to client */
656 struct obd_export *class_conn2export(struct lustre_handle *conn)
658 struct obd_export *export;
662 CDEBUG(D_CACHE, "looking for null handle\n");
666 if (conn->cookie == -1) { /* this means assign a new connection */
667 CDEBUG(D_CACHE, "want a new connection\n");
671 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
672 export = class_handle2object(conn->cookie);
676 struct obd_device *class_exp2obd(struct obd_export *exp)
683 struct obd_device *class_conn2obd(struct lustre_handle *conn)
685 struct obd_export *export;
686 export = class_conn2export(conn);
688 struct obd_device *obd = export->exp_obd;
689 class_export_put(export);
695 struct obd_import *class_exp2cliimp(struct obd_export *exp)
697 struct obd_device *obd = exp->exp_obd;
700 return obd->u.cli.cl_import;
703 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
705 struct obd_device *obd = class_conn2obd(conn);
708 return obd->u.cli.cl_import;
711 /* Export management functions */
713 /* if export is involved in recovery then clean up related things */
714 void class_export_recovery_cleanup(struct obd_export *exp)
716 struct obd_device *obd = exp->exp_obd;
718 cfs_spin_lock(&obd->obd_recovery_task_lock);
719 if (exp->exp_delayed)
720 obd->obd_delayed_clients--;
721 if (obd->obd_recovering && exp->exp_in_recovery) {
722 cfs_spin_lock(&exp->exp_lock);
723 exp->exp_in_recovery = 0;
724 cfs_spin_unlock(&exp->exp_lock);
725 LASSERT(obd->obd_connected_clients);
726 obd->obd_connected_clients--;
728 cfs_spin_unlock(&obd->obd_recovery_task_lock);
729 /** Cleanup req replay fields */
730 if (exp->exp_req_replay_needed) {
731 cfs_spin_lock(&exp->exp_lock);
732 exp->exp_req_replay_needed = 0;
733 cfs_spin_unlock(&exp->exp_lock);
734 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
735 cfs_atomic_dec(&obd->obd_req_replay_clients);
737 /** Cleanup lock replay data */
738 if (exp->exp_lock_replay_needed) {
739 cfs_spin_lock(&exp->exp_lock);
740 exp->exp_lock_replay_needed = 0;
741 cfs_spin_unlock(&exp->exp_lock);
742 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
743 cfs_atomic_dec(&obd->obd_lock_replay_clients);
747 static void class_export_destroy(struct obd_export *exp)
749 struct obd_device *obd = exp->exp_obd;
752 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
753 LASSERT(obd != NULL);
755 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
756 exp->exp_client_uuid.uuid, obd->obd_name);
759 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
760 if (exp->exp_connection)
761 ptlrpc_put_connection_superhack(exp->exp_connection);
763 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
764 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
765 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
766 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
767 obd_destroy_export(exp);
768 class_decref(obd, "export", exp);
770 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
774 static void export_handle_addref(void *export)
776 class_export_get(export);
779 struct obd_export *class_export_get(struct obd_export *exp)
781 cfs_atomic_inc(&exp->exp_refcount);
782 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
783 cfs_atomic_read(&exp->exp_refcount));
786 EXPORT_SYMBOL(class_export_get);
788 void class_export_put(struct obd_export *exp)
790 LASSERT(exp != NULL);
791 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
792 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
793 cfs_atomic_read(&exp->exp_refcount) - 1);
795 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
796 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
797 CDEBUG(D_IOCTL, "final put %p/%s\n",
798 exp, exp->exp_client_uuid.uuid);
800 /* release nid stat refererence */
801 lprocfs_exp_cleanup(exp);
802 class_export_recovery_cleanup(exp);
804 obd_zombie_export_add(exp);
807 EXPORT_SYMBOL(class_export_put);
809 /* Creates a new export, adds it to the hash table, and returns a
810 * pointer to it. The refcount is 2: one for the hash reference, and
811 * one for the pointer returned by this function. */
812 struct obd_export *class_new_export(struct obd_device *obd,
813 struct obd_uuid *cluuid)
815 struct obd_export *export;
816 cfs_hash_t *hash = NULL;
820 OBD_ALLOC_PTR(export);
822 return ERR_PTR(-ENOMEM);
824 export->exp_conn_cnt = 0;
825 export->exp_lock_hash = NULL;
826 cfs_atomic_set(&export->exp_refcount, 2);
827 cfs_atomic_set(&export->exp_rpc_count, 0);
828 cfs_atomic_set(&export->exp_cb_count, 0);
829 cfs_atomic_set(&export->exp_locks_count, 0);
830 #if LUSTRE_TRACKS_LOCK_EXP_REFS
831 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
832 cfs_spin_lock_init(&export->exp_locks_list_guard);
834 cfs_atomic_set(&export->exp_replay_count, 0);
835 export->exp_obd = obd;
836 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
837 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
838 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
839 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
840 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
841 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
842 class_handle_hash(&export->exp_handle, export_handle_addref);
843 export->exp_last_request_time = cfs_time_current_sec();
844 cfs_spin_lock_init(&export->exp_lock);
845 cfs_spin_lock_init(&export->exp_rpc_lock);
846 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
847 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
848 cfs_spin_lock_init(&export->exp_bl_list_lock);
849 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
851 export->exp_sp_peer = LUSTRE_SP_ANY;
852 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
853 export->exp_client_uuid = *cluuid;
854 obd_init_export(export);
856 cfs_spin_lock(&obd->obd_dev_lock);
857 /* shouldn't happen, but might race */
858 if (obd->obd_stopping)
859 GOTO(exit_unlock, rc = -ENODEV);
861 hash = cfs_hash_getref(obd->obd_uuid_hash);
863 GOTO(exit_unlock, rc = -ENODEV);
864 cfs_spin_unlock(&obd->obd_dev_lock);
866 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
867 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
869 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
870 obd->obd_name, cluuid->uuid, rc);
871 GOTO(exit_err, rc = -EALREADY);
875 cfs_spin_lock(&obd->obd_dev_lock);
876 if (obd->obd_stopping) {
877 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
878 GOTO(exit_unlock, rc = -ENODEV);
881 class_incref(obd, "export", export);
882 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
883 cfs_list_add_tail(&export->exp_obd_chain_timed,
884 &export->exp_obd->obd_exports_timed);
885 export->exp_obd->obd_num_exports++;
886 cfs_spin_unlock(&obd->obd_dev_lock);
887 cfs_hash_putref(hash);
891 cfs_spin_unlock(&obd->obd_dev_lock);
894 cfs_hash_putref(hash);
895 class_handle_unhash(&export->exp_handle);
896 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
897 obd_destroy_export(export);
898 OBD_FREE_PTR(export);
901 EXPORT_SYMBOL(class_new_export);
903 void class_unlink_export(struct obd_export *exp)
905 class_handle_unhash(&exp->exp_handle);
907 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
908 /* delete an uuid-export hashitem from hashtables */
909 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
910 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
911 &exp->exp_client_uuid,
912 &exp->exp_uuid_hash);
914 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
915 cfs_list_del_init(&exp->exp_obd_chain_timed);
916 exp->exp_obd->obd_num_exports--;
917 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
918 class_export_put(exp);
920 EXPORT_SYMBOL(class_unlink_export);
922 /* Import management functions */
923 void class_import_destroy(struct obd_import *imp)
927 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
928 imp->imp_obd->obd_name);
930 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
932 ptlrpc_put_connection_superhack(imp->imp_connection);
934 while (!cfs_list_empty(&imp->imp_conn_list)) {
935 struct obd_import_conn *imp_conn;
937 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
938 struct obd_import_conn, oic_item);
939 cfs_list_del_init(&imp_conn->oic_item);
940 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
941 OBD_FREE(imp_conn, sizeof(*imp_conn));
944 LASSERT(imp->imp_sec == NULL);
945 class_decref(imp->imp_obd, "import", imp);
946 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
950 static void import_handle_addref(void *import)
952 class_import_get(import);
955 struct obd_import *class_import_get(struct obd_import *import)
957 cfs_atomic_inc(&import->imp_refcount);
958 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
959 cfs_atomic_read(&import->imp_refcount),
960 import->imp_obd->obd_name);
963 EXPORT_SYMBOL(class_import_get);
965 void class_import_put(struct obd_import *imp)
969 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
970 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
972 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
973 cfs_atomic_read(&imp->imp_refcount) - 1,
974 imp->imp_obd->obd_name);
976 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
977 CDEBUG(D_INFO, "final put import %p\n", imp);
978 obd_zombie_import_add(imp);
983 EXPORT_SYMBOL(class_import_put);
985 static void init_imp_at(struct imp_at *at) {
987 at_init(&at->iat_net_latency, 0, 0);
988 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
989 /* max service estimates are tracked on the server side, so
990 don't use the AT history here, just use the last reported
991 val. (But keep hist for proc histogram, worst_ever) */
992 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
997 struct obd_import *class_new_import(struct obd_device *obd)
999 struct obd_import *imp;
1001 OBD_ALLOC(imp, sizeof(*imp));
1005 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1006 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1007 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1008 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1009 cfs_spin_lock_init(&imp->imp_lock);
1010 imp->imp_last_success_conn = 0;
1011 imp->imp_state = LUSTRE_IMP_NEW;
1012 imp->imp_obd = class_incref(obd, "import", imp);
1013 cfs_sema_init(&imp->imp_sec_mutex, 1);
1014 cfs_waitq_init(&imp->imp_recovery_waitq);
1016 cfs_atomic_set(&imp->imp_refcount, 2);
1017 cfs_atomic_set(&imp->imp_unregistering, 0);
1018 cfs_atomic_set(&imp->imp_inflight, 0);
1019 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1020 cfs_atomic_set(&imp->imp_inval_count, 0);
1021 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1022 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1023 class_handle_hash(&imp->imp_handle, import_handle_addref);
1024 init_imp_at(&imp->imp_at);
1026 /* the default magic is V2, will be used in connect RPC, and
1027 * then adjusted according to the flags in request/reply. */
1028 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1032 EXPORT_SYMBOL(class_new_import);
1034 void class_destroy_import(struct obd_import *import)
1036 LASSERT(import != NULL);
1037 LASSERT(import != LP_POISON);
1039 class_handle_unhash(&import->imp_handle);
1041 cfs_spin_lock(&import->imp_lock);
1042 import->imp_generation++;
1043 cfs_spin_unlock(&import->imp_lock);
1044 class_import_put(import);
1046 EXPORT_SYMBOL(class_destroy_import);
1048 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1050 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1052 cfs_spin_lock(&exp->exp_locks_list_guard);
1054 LASSERT(lock->l_exp_refs_nr >= 0);
1056 if (lock->l_exp_refs_target != NULL &&
1057 lock->l_exp_refs_target != exp) {
1058 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1059 exp, lock, lock->l_exp_refs_target);
1061 if ((lock->l_exp_refs_nr ++) == 0) {
1062 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1063 lock->l_exp_refs_target = exp;
1065 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1066 lock, exp, lock->l_exp_refs_nr);
1067 cfs_spin_unlock(&exp->exp_locks_list_guard);
1069 EXPORT_SYMBOL(__class_export_add_lock_ref);
1071 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1073 cfs_spin_lock(&exp->exp_locks_list_guard);
1074 LASSERT(lock->l_exp_refs_nr > 0);
1075 if (lock->l_exp_refs_target != exp) {
1076 LCONSOLE_WARN("lock %p, "
1077 "mismatching export pointers: %p, %p\n",
1078 lock, lock->l_exp_refs_target, exp);
1080 if (-- lock->l_exp_refs_nr == 0) {
1081 cfs_list_del_init(&lock->l_exp_refs_link);
1082 lock->l_exp_refs_target = NULL;
1084 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1085 lock, exp, lock->l_exp_refs_nr);
1086 cfs_spin_unlock(&exp->exp_locks_list_guard);
1088 EXPORT_SYMBOL(__class_export_del_lock_ref);
1091 /* A connection defines an export context in which preallocation can
1092 be managed. This releases the export pointer reference, and returns
1093 the export handle, so the export refcount is 1 when this function
1095 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1096 struct obd_uuid *cluuid)
1098 struct obd_export *export;
1099 LASSERT(conn != NULL);
1100 LASSERT(obd != NULL);
1101 LASSERT(cluuid != NULL);
1104 export = class_new_export(obd, cluuid);
1106 RETURN(PTR_ERR(export));
1108 conn->cookie = export->exp_handle.h_cookie;
1109 class_export_put(export);
1111 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1112 cluuid->uuid, conn->cookie);
1115 EXPORT_SYMBOL(class_connect);
1117 /* This function removes 1-3 references from the export:
1118 * 1 - for export pointer passed
1119 * and if disconnect really need
1120 * 2 - removing from hash
1121 * 3 - in client_unlink_export
1122 * The export pointer passed to this function can destroyed */
1123 int class_disconnect(struct obd_export *export)
1125 int already_disconnected;
1128 if (export == NULL) {
1130 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1134 cfs_spin_lock(&export->exp_lock);
1135 already_disconnected = export->exp_disconnected;
1136 export->exp_disconnected = 1;
1137 cfs_spin_unlock(&export->exp_lock);
1139 /* class_cleanup(), abort_recovery(), and class_fail_export()
1140 * all end up in here, and if any of them race we shouldn't
1141 * call extra class_export_puts(). */
1142 if (already_disconnected) {
1143 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1144 GOTO(no_disconn, already_disconnected);
1147 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1148 export->exp_handle.h_cookie);
1150 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1151 cfs_hash_del(export->exp_obd->obd_nid_hash,
1152 &export->exp_connection->c_peer.nid,
1153 &export->exp_nid_hash);
1155 class_unlink_export(export);
1157 class_export_put(export);
1161 /* Return non-zero for a fully connected export */
1162 int class_connected_export(struct obd_export *exp)
1166 cfs_spin_lock(&exp->exp_lock);
1167 connected = (exp->exp_conn_cnt > 0);
1168 cfs_spin_unlock(&exp->exp_lock);
1173 EXPORT_SYMBOL(class_connected_export);
1175 static void class_disconnect_export_list(cfs_list_t *list,
1176 enum obd_option flags)
1179 struct obd_export *exp;
1182 /* It's possible that an export may disconnect itself, but
1183 * nothing else will be added to this list. */
1184 while (!cfs_list_empty(list)) {
1185 exp = cfs_list_entry(list->next, struct obd_export,
1187 /* need for safe call CDEBUG after obd_disconnect */
1188 class_export_get(exp);
1190 cfs_spin_lock(&exp->exp_lock);
1191 exp->exp_flags = flags;
1192 cfs_spin_unlock(&exp->exp_lock);
1194 if (obd_uuid_equals(&exp->exp_client_uuid,
1195 &exp->exp_obd->obd_uuid)) {
1197 "exp %p export uuid == obd uuid, don't discon\n",
1199 /* Need to delete this now so we don't end up pointing
1200 * to work_list later when this export is cleaned up. */
1201 cfs_list_del_init(&exp->exp_obd_chain);
1202 class_export_put(exp);
1206 class_export_get(exp);
1207 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1208 "last request at "CFS_TIME_T"\n",
1209 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1210 exp, exp->exp_last_request_time);
1211 /* release one export reference anyway */
1212 rc = obd_disconnect(exp);
1214 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1215 obd_export_nid2str(exp), exp, rc);
1216 class_export_put(exp);
1221 void class_disconnect_exports(struct obd_device *obd)
1223 cfs_list_t work_list;
1226 /* Move all of the exports from obd_exports to a work list, en masse. */
1227 CFS_INIT_LIST_HEAD(&work_list);
1228 cfs_spin_lock(&obd->obd_dev_lock);
1229 cfs_list_splice_init(&obd->obd_exports, &work_list);
1230 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1231 cfs_spin_unlock(&obd->obd_dev_lock);
1233 if (!cfs_list_empty(&work_list)) {
1234 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1235 "disconnecting them\n", obd->obd_minor, obd);
1236 class_disconnect_export_list(&work_list,
1237 exp_flags_from_obd(obd));
1239 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1240 obd->obd_minor, obd);
1243 EXPORT_SYMBOL(class_disconnect_exports);
1245 /* Remove exports that have not completed recovery.
1247 void class_disconnect_stale_exports(struct obd_device *obd,
1248 int (*test_export)(struct obd_export *))
1250 cfs_list_t work_list;
1251 cfs_list_t *pos, *n;
1252 struct obd_export *exp;
1256 CFS_INIT_LIST_HEAD(&work_list);
1257 cfs_spin_lock(&obd->obd_dev_lock);
1258 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1261 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1263 /* don't count self-export as client */
1264 if (obd_uuid_equals(&exp->exp_client_uuid,
1265 &exp->exp_obd->obd_uuid))
1268 if (test_export(exp))
1271 cfs_spin_lock(&exp->exp_lock);
1272 failed = exp->exp_failed;
1273 exp->exp_failed = 1;
1274 cfs_spin_unlock(&exp->exp_lock);
1278 cfs_list_move(&exp->exp_obd_chain, &work_list);
1280 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1281 obd->obd_name, exp->exp_client_uuid.uuid,
1282 exp->exp_connection == NULL ? "<unknown>" :
1283 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1284 print_export_data(exp, "EVICTING", 0);
1286 cfs_spin_unlock(&obd->obd_dev_lock);
1289 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1290 obd->obd_name, evicted);
1291 obd->obd_stale_clients += evicted;
1293 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1294 OBD_OPT_ABORT_RECOV);
1297 EXPORT_SYMBOL(class_disconnect_stale_exports);
1299 void class_fail_export(struct obd_export *exp)
1301 int rc, already_failed;
1303 cfs_spin_lock(&exp->exp_lock);
1304 already_failed = exp->exp_failed;
1305 exp->exp_failed = 1;
1306 cfs_spin_unlock(&exp->exp_lock);
1308 if (already_failed) {
1309 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1310 exp, exp->exp_client_uuid.uuid);
1314 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1315 exp, exp->exp_client_uuid.uuid);
1317 if (obd_dump_on_timeout)
1318 libcfs_debug_dumplog();
1320 /* need for safe call CDEBUG after obd_disconnect */
1321 class_export_get(exp);
1323 /* Most callers into obd_disconnect are removing their own reference
1324 * (request, for example) in addition to the one from the hash table.
1325 * We don't have such a reference here, so make one. */
1326 class_export_get(exp);
1327 rc = obd_disconnect(exp);
1329 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1331 CDEBUG(D_HA, "disconnected export %p/%s\n",
1332 exp, exp->exp_client_uuid.uuid);
1333 class_export_put(exp);
1335 EXPORT_SYMBOL(class_fail_export);
1337 char *obd_export_nid2str(struct obd_export *exp)
1339 if (exp->exp_connection != NULL)
1340 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1344 EXPORT_SYMBOL(obd_export_nid2str);
1346 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1348 struct obd_export *doomed_exp = NULL;
1349 int exports_evicted = 0;
1351 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1354 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1355 if (doomed_exp == NULL)
1358 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1359 "nid %s found, wanted nid %s, requested nid %s\n",
1360 obd_export_nid2str(doomed_exp),
1361 libcfs_nid2str(nid_key), nid);
1362 LASSERTF(doomed_exp != obd->obd_self_export,
1363 "self-export is hashed by NID?\n");
1365 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1366 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1368 class_fail_export(doomed_exp);
1369 class_export_put(doomed_exp);
1372 if (!exports_evicted)
1373 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1374 obd->obd_name, nid);
1375 return exports_evicted;
1377 EXPORT_SYMBOL(obd_export_evict_by_nid);
1379 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1381 struct obd_export *doomed_exp = NULL;
1382 struct obd_uuid doomed_uuid;
1383 int exports_evicted = 0;
1385 obd_str2uuid(&doomed_uuid, uuid);
1386 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1387 CERROR("%s: can't evict myself\n", obd->obd_name);
1388 return exports_evicted;
1391 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1393 if (doomed_exp == NULL) {
1394 CERROR("%s: can't disconnect %s: no exports found\n",
1395 obd->obd_name, uuid);
1397 CWARN("%s: evicting %s at adminstrative request\n",
1398 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1399 class_fail_export(doomed_exp);
1400 class_export_put(doomed_exp);
1404 return exports_evicted;
1406 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1408 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1409 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1410 EXPORT_SYMBOL(class_export_dump_hook);
1413 static void print_export_data(struct obd_export *exp, const char *status,
1416 struct ptlrpc_reply_state *rs;
1417 struct ptlrpc_reply_state *first_reply = NULL;
1420 cfs_spin_lock(&exp->exp_lock);
1421 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1427 cfs_spin_unlock(&exp->exp_lock);
1429 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1430 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1431 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1432 cfs_atomic_read(&exp->exp_rpc_count),
1433 cfs_atomic_read(&exp->exp_cb_count),
1434 cfs_atomic_read(&exp->exp_locks_count),
1435 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1436 nreplies, first_reply, nreplies > 3 ? "..." : "",
1437 exp->exp_last_committed);
1438 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1439 if (locks && class_export_dump_hook != NULL)
1440 class_export_dump_hook(exp);
1444 void dump_exports(struct obd_device *obd, int locks)
1446 struct obd_export *exp;
1448 cfs_spin_lock(&obd->obd_dev_lock);
1449 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1450 print_export_data(exp, "ACTIVE", locks);
1451 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1452 print_export_data(exp, "UNLINKED", locks);
1453 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1454 print_export_data(exp, "DELAYED", locks);
1455 cfs_spin_unlock(&obd->obd_dev_lock);
1456 cfs_spin_lock(&obd_zombie_impexp_lock);
1457 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1458 print_export_data(exp, "ZOMBIE", locks);
1459 cfs_spin_unlock(&obd_zombie_impexp_lock);
1461 EXPORT_SYMBOL(dump_exports);
1463 void obd_exports_barrier(struct obd_device *obd)
1466 LASSERT(cfs_list_empty(&obd->obd_exports));
1467 cfs_spin_lock(&obd->obd_dev_lock);
1468 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1469 cfs_spin_unlock(&obd->obd_dev_lock);
1470 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1471 cfs_time_seconds(waited));
1472 if (waited > 5 && IS_PO2(waited)) {
1473 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1474 "more than %d seconds. "
1475 "The obd refcount = %d. Is it stuck?\n",
1476 obd->obd_name, waited,
1477 cfs_atomic_read(&obd->obd_refcount));
1478 dump_exports(obd, 1);
1481 cfs_spin_lock(&obd->obd_dev_lock);
1483 cfs_spin_unlock(&obd->obd_dev_lock);
1485 EXPORT_SYMBOL(obd_exports_barrier);
1487 /* Total amount of zombies to be destroyed */
1488 static int zombies_count = 0;
1491 * kill zombie imports and exports
1493 void obd_zombie_impexp_cull(void)
1495 struct obd_import *import;
1496 struct obd_export *export;
1500 cfs_spin_lock(&obd_zombie_impexp_lock);
1503 if (!cfs_list_empty(&obd_zombie_imports)) {
1504 import = cfs_list_entry(obd_zombie_imports.next,
1507 cfs_list_del_init(&import->imp_zombie_chain);
1511 if (!cfs_list_empty(&obd_zombie_exports)) {
1512 export = cfs_list_entry(obd_zombie_exports.next,
1515 cfs_list_del_init(&export->exp_obd_chain);
1518 cfs_spin_unlock(&obd_zombie_impexp_lock);
1520 if (import != NULL) {
1521 class_import_destroy(import);
1522 cfs_spin_lock(&obd_zombie_impexp_lock);
1524 cfs_spin_unlock(&obd_zombie_impexp_lock);
1527 if (export != NULL) {
1528 class_export_destroy(export);
1529 cfs_spin_lock(&obd_zombie_impexp_lock);
1531 cfs_spin_unlock(&obd_zombie_impexp_lock);
1535 } while (import != NULL || export != NULL);
1539 static cfs_completion_t obd_zombie_start;
1540 static cfs_completion_t obd_zombie_stop;
1541 static unsigned long obd_zombie_flags;
1542 static cfs_waitq_t obd_zombie_waitq;
1543 static pid_t obd_zombie_pid;
1546 OBD_ZOMBIE_STOP = 1 << 1
1550 * check for work for kill zombie import/export thread.
1552 static int obd_zombie_impexp_check(void *arg)
1556 cfs_spin_lock(&obd_zombie_impexp_lock);
1557 rc = (zombies_count == 0) &&
1558 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1559 cfs_spin_unlock(&obd_zombie_impexp_lock);
1565 * Add export to the obd_zombe thread and notify it.
1567 static void obd_zombie_export_add(struct obd_export *exp) {
1568 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1569 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1570 cfs_list_del_init(&exp->exp_obd_chain);
1571 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1572 cfs_spin_lock(&obd_zombie_impexp_lock);
1574 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1575 cfs_spin_unlock(&obd_zombie_impexp_lock);
1577 if (obd_zombie_impexp_notify != NULL)
1578 obd_zombie_impexp_notify();
1582 * Add import to the obd_zombe thread and notify it.
1584 static void obd_zombie_import_add(struct obd_import *imp) {
1585 LASSERT(imp->imp_sec == NULL);
1586 LASSERT(imp->imp_rq_pool == NULL);
1587 cfs_spin_lock(&obd_zombie_impexp_lock);
1588 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1590 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1591 cfs_spin_unlock(&obd_zombie_impexp_lock);
1593 if (obd_zombie_impexp_notify != NULL)
1594 obd_zombie_impexp_notify();
1598 * notify import/export destroy thread about new zombie.
1600 static void obd_zombie_impexp_notify(void)
1603 * Make sure obd_zomebie_impexp_thread get this notification.
1604 * It is possible this signal only get by obd_zombie_barrier, and
1605 * barrier gulps this notification and sleeps away and hangs ensues
1607 cfs_waitq_broadcast(&obd_zombie_waitq);
1611 * check whether obd_zombie is idle
1613 static int obd_zombie_is_idle(void)
1617 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1618 cfs_spin_lock(&obd_zombie_impexp_lock);
1619 rc = (zombies_count == 0);
1620 cfs_spin_unlock(&obd_zombie_impexp_lock);
1625 * wait when obd_zombie import/export queues become empty
1627 void obd_zombie_barrier(void)
1629 struct l_wait_info lwi = { 0 };
1631 if (obd_zombie_pid == cfs_curproc_pid())
1632 /* don't wait for myself */
1634 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1636 EXPORT_SYMBOL(obd_zombie_barrier);
1641 * destroy zombie export/import thread.
1643 static int obd_zombie_impexp_thread(void *unused)
1647 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1648 cfs_complete(&obd_zombie_start);
1652 cfs_complete(&obd_zombie_start);
1654 obd_zombie_pid = cfs_curproc_pid();
1656 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1657 struct l_wait_info lwi = { 0 };
1659 l_wait_event(obd_zombie_waitq,
1660 !obd_zombie_impexp_check(NULL), &lwi);
1661 obd_zombie_impexp_cull();
1664 * Notify obd_zombie_barrier callers that queues
1667 cfs_waitq_signal(&obd_zombie_waitq);
1670 cfs_complete(&obd_zombie_stop);
1675 #else /* ! KERNEL */
1677 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1678 static void *obd_zombie_impexp_work_cb;
1679 static void *obd_zombie_impexp_idle_cb;
1681 int obd_zombie_impexp_kill(void *arg)
1685 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1686 obd_zombie_impexp_cull();
1689 cfs_atomic_dec(&zombie_recur);
1696 * start destroy zombie import/export thread
1698 int obd_zombie_impexp_init(void)
1702 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1703 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1704 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1705 cfs_init_completion(&obd_zombie_start);
1706 cfs_init_completion(&obd_zombie_stop);
1707 cfs_waitq_init(&obd_zombie_waitq);
1711 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1715 cfs_wait_for_completion(&obd_zombie_start);
1718 obd_zombie_impexp_work_cb =
1719 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1720 &obd_zombie_impexp_kill, NULL);
1722 obd_zombie_impexp_idle_cb =
1723 liblustre_register_idle_callback("obd_zombi_impexp_check",
1724 &obd_zombie_impexp_check, NULL);
1730 * stop destroy zombie import/export thread
1732 void obd_zombie_impexp_stop(void)
1734 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1735 obd_zombie_impexp_notify();
1737 cfs_wait_for_completion(&obd_zombie_stop);
1739 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1740 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1744 /***** Kernel-userspace comm helpers *******/
1746 /* Get length of entire message, including header */
1747 int kuc_len(int payload_len)
1749 return sizeof(struct kuc_hdr) + payload_len;
1751 EXPORT_SYMBOL(kuc_len);
1753 /* Get a pointer to kuc header, given a ptr to the payload
1754 * @param p Pointer to payload area
1755 * @returns Pointer to kuc header
1757 struct kuc_hdr * kuc_ptr(void *p)
1759 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1760 LASSERT(lh->kuc_magic == KUC_MAGIC);
1763 EXPORT_SYMBOL(kuc_ptr);
1765 /* Test if payload is part of kuc message
1766 * @param p Pointer to payload area
1769 int kuc_ispayload(void *p)
1771 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1773 if (kh->kuc_magic == KUC_MAGIC)
1778 EXPORT_SYMBOL(kuc_ispayload);
1780 /* Alloc space for a message, and fill in header
1781 * @return Pointer to payload area
1783 void *kuc_alloc(int payload_len, int transport, int type)
1786 int len = kuc_len(payload_len);
1790 return ERR_PTR(-ENOMEM);
1792 lh->kuc_magic = KUC_MAGIC;
1793 lh->kuc_transport = transport;
1794 lh->kuc_msgtype = type;
1795 lh->kuc_msglen = len;
1797 return (void *)(lh + 1);
1799 EXPORT_SYMBOL(kuc_alloc);
1801 /* Takes pointer to payload area */
1802 inline void kuc_free(void *p, int payload_len)
1804 struct kuc_hdr *lh = kuc_ptr(p);
1805 OBD_FREE(lh, kuc_len(payload_len));
1807 EXPORT_SYMBOL(kuc_free);