1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011, 2012, Whamcloud, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lustre/obdclass/genops.c
40 * These are the only exported functions, they provide some generic
41 * infrastructure for managing object devices
44 #define DEBUG_SUBSYSTEM S_CLASS
46 #include <liblustre.h>
49 #include <obd_class.h>
50 #include <lprocfs_status.h>
52 extern cfs_list_t obd_types;
53 cfs_spinlock_t obd_types_lock;
55 cfs_mem_cache_t *obd_device_cachep;
56 cfs_mem_cache_t *obdo_cachep;
57 EXPORT_SYMBOL(obdo_cachep);
58 cfs_mem_cache_t *import_cachep;
60 cfs_list_t obd_zombie_imports;
61 cfs_list_t obd_zombie_exports;
62 cfs_spinlock_t obd_zombie_impexp_lock;
63 static void obd_zombie_impexp_notify(void);
64 static void obd_zombie_export_add(struct obd_export *exp);
65 static void obd_zombie_import_add(struct obd_import *imp);
66 static void print_export_data(struct obd_export *exp,
67 const char *status, int locks);
69 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 * support functions: we could use inter-module communication, but this
73 * is more portable to other OS's
75 static struct obd_device *obd_device_alloc(void)
77 struct obd_device *obd;
79 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
81 obd->obd_magic = OBD_DEVICE_MAGIC;
86 static void obd_device_free(struct obd_device *obd)
89 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
90 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
91 if (obd->obd_namespace != NULL) {
92 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
93 obd, obd->obd_namespace, obd->obd_force);
96 lu_ref_fini(&obd->obd_reference);
97 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
100 struct obd_type *class_search_type(const char *name)
103 struct obd_type *type;
105 cfs_spin_lock(&obd_types_lock);
106 cfs_list_for_each(tmp, &obd_types) {
107 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
108 if (strcmp(type->typ_name, name) == 0) {
109 cfs_spin_unlock(&obd_types_lock);
113 cfs_spin_unlock(&obd_types_lock);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
124 if (!cfs_request_module("%s", modname)) {
125 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126 type = class_search_type(name);
128 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134 cfs_spin_lock(&type->obd_type_lock);
136 cfs_try_module_get(type->typ_dt_ops->o_owner);
137 cfs_spin_unlock(&type->obd_type_lock);
141 EXPORT_SYMBOL(class_get_type);
143 void class_put_type(struct obd_type *type)
146 cfs_spin_lock(&type->obd_type_lock);
148 cfs_module_put(type->typ_dt_ops->o_owner);
149 cfs_spin_unlock(&type->obd_type_lock);
151 EXPORT_SYMBOL(class_put_type);
153 #define CLASS_MAX_NAME 1024
155 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
156 struct lprocfs_vars *vars, const char *name,
157 struct lu_device_type *ldt)
159 struct obd_type *type;
164 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
166 if (class_search_type(name)) {
167 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
172 OBD_ALLOC(type, sizeof(*type));
176 OBD_ALLOC_PTR(type->typ_dt_ops);
177 OBD_ALLOC_PTR(type->typ_md_ops);
178 OBD_ALLOC(type->typ_name, strlen(name) + 1);
180 if (type->typ_dt_ops == NULL ||
181 type->typ_md_ops == NULL ||
182 type->typ_name == NULL)
185 *(type->typ_dt_ops) = *dt_ops;
186 /* md_ops is optional */
188 *(type->typ_md_ops) = *md_ops;
189 strcpy(type->typ_name, name);
190 cfs_spin_lock_init(&type->obd_type_lock);
193 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
195 if (IS_ERR(type->typ_procroot)) {
196 rc = PTR_ERR(type->typ_procroot);
197 type->typ_procroot = NULL;
203 rc = lu_device_type_init(ldt);
208 cfs_spin_lock(&obd_types_lock);
209 cfs_list_add(&type->typ_chain, &obd_types);
210 cfs_spin_unlock(&obd_types_lock);
215 if (type->typ_name != NULL)
216 OBD_FREE(type->typ_name, strlen(name) + 1);
217 if (type->typ_md_ops != NULL)
218 OBD_FREE_PTR(type->typ_md_ops);
219 if (type->typ_dt_ops != NULL)
220 OBD_FREE_PTR(type->typ_dt_ops);
221 OBD_FREE(type, sizeof(*type));
224 EXPORT_SYMBOL(class_register_type);
226 int class_unregister_type(const char *name)
228 struct obd_type *type = class_search_type(name);
232 CERROR("unknown obd type\n");
236 if (type->typ_refcnt) {
237 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238 /* This is a bad situation, let's make the best of it */
239 /* Remove ops, but leave the name for debugging */
240 OBD_FREE_PTR(type->typ_dt_ops);
241 OBD_FREE_PTR(type->typ_md_ops);
245 if (type->typ_procroot) {
246 lprocfs_remove(&type->typ_procroot);
250 lu_device_type_fini(type->typ_lu);
252 cfs_spin_lock(&obd_types_lock);
253 cfs_list_del(&type->typ_chain);
254 cfs_spin_unlock(&obd_types_lock);
255 OBD_FREE(type->typ_name, strlen(name) + 1);
256 if (type->typ_dt_ops != NULL)
257 OBD_FREE_PTR(type->typ_dt_ops);
258 if (type->typ_md_ops != NULL)
259 OBD_FREE_PTR(type->typ_md_ops);
260 OBD_FREE(type, sizeof(*type));
262 } /* class_unregister_type */
263 EXPORT_SYMBOL(class_unregister_type);
266 * Create a new obd device.
268 * Find an empty slot in ::obd_devs[], create a new obd device in it.
270 * \param[in] type_name obd device type string.
271 * \param[in] name obd device name.
273 * \retval NULL if create fails, otherwise return the obd device
276 struct obd_device *class_newdev(const char *type_name, const char *name)
278 struct obd_device *result = NULL;
279 struct obd_device *newdev;
280 struct obd_type *type = NULL;
282 int new_obd_minor = 0;
285 if (strlen(name) >= MAX_OBD_NAME) {
286 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287 RETURN(ERR_PTR(-EINVAL));
290 type = class_get_type(type_name);
292 CERROR("OBD: unknown type: %s\n", type_name);
293 RETURN(ERR_PTR(-ENODEV));
296 newdev = obd_device_alloc();
297 if (newdev == NULL) {
298 class_put_type(type);
299 RETURN(ERR_PTR(-ENOMEM));
301 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303 cfs_write_lock(&obd_dev_lock);
304 for (i = 0; i < class_devno_max(); i++) {
305 struct obd_device *obd = class_num2obd(i);
307 if (obd && obd->obd_name &&
308 (strcmp(name, obd->obd_name) == 0)) {
309 CERROR("Device %s already exists at %d, won't add\n",
312 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313 "%p obd_magic %08x != %08x\n", result,
314 result->obd_magic, OBD_DEVICE_MAGIC);
315 LASSERTF(result->obd_minor == new_obd_minor,
316 "%p obd_minor %d != %d\n", result,
317 result->obd_minor, new_obd_minor);
319 obd_devs[result->obd_minor] = NULL;
320 result->obd_name[0]='\0';
322 result = ERR_PTR(-EEXIST);
325 if (!result && !obd) {
327 result->obd_minor = i;
329 result->obd_type = type;
330 strncpy(result->obd_name, name,
331 sizeof(result->obd_name) - 1);
332 obd_devs[i] = result;
335 cfs_write_unlock(&obd_dev_lock);
337 if (result == NULL && i >= class_devno_max()) {
338 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
340 RETURN(ERR_PTR(-EOVERFLOW));
343 if (IS_ERR(result)) {
344 obd_device_free(newdev);
345 class_put_type(type);
347 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348 result->obd_name, result);
353 void class_release_dev(struct obd_device *obd)
355 struct obd_type *obd_type = obd->obd_type;
357 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
358 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
359 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
360 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
361 LASSERT(obd_type != NULL);
363 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
364 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
366 cfs_write_lock(&obd_dev_lock);
367 obd_devs[obd->obd_minor] = NULL;
368 cfs_write_unlock(&obd_dev_lock);
369 obd_device_free(obd);
371 class_put_type(obd_type);
374 int class_name2dev(const char *name)
381 cfs_read_lock(&obd_dev_lock);
382 for (i = 0; i < class_devno_max(); i++) {
383 struct obd_device *obd = class_num2obd(i);
385 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
386 /* Make sure we finished attaching before we give
387 out any references */
388 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
389 if (obd->obd_attached) {
390 cfs_read_unlock(&obd_dev_lock);
396 cfs_read_unlock(&obd_dev_lock);
400 EXPORT_SYMBOL(class_name2dev);
402 struct obd_device *class_name2obd(const char *name)
404 int dev = class_name2dev(name);
406 if (dev < 0 || dev > class_devno_max())
408 return class_num2obd(dev);
410 EXPORT_SYMBOL(class_name2obd);
412 int class_uuid2dev(struct obd_uuid *uuid)
416 cfs_read_lock(&obd_dev_lock);
417 for (i = 0; i < class_devno_max(); i++) {
418 struct obd_device *obd = class_num2obd(i);
420 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
421 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
422 cfs_read_unlock(&obd_dev_lock);
426 cfs_read_unlock(&obd_dev_lock);
430 EXPORT_SYMBOL(class_uuid2dev);
432 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
434 int dev = class_uuid2dev(uuid);
437 return class_num2obd(dev);
439 EXPORT_SYMBOL(class_uuid2obd);
442 * Get obd device from ::obd_devs[]
444 * \param num [in] array index
446 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
447 * otherwise return the obd device there.
449 struct obd_device *class_num2obd(int num)
451 struct obd_device *obd = NULL;
453 if (num < class_devno_max()) {
458 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
459 "%p obd_magic %08x != %08x\n",
460 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
461 LASSERTF(obd->obd_minor == num,
462 "%p obd_minor %0d != %0d\n",
463 obd, obd->obd_minor, num);
468 EXPORT_SYMBOL(class_num2obd);
470 void class_obd_list(void)
475 cfs_read_lock(&obd_dev_lock);
476 for (i = 0; i < class_devno_max(); i++) {
477 struct obd_device *obd = class_num2obd(i);
481 if (obd->obd_stopping)
483 else if (obd->obd_set_up)
485 else if (obd->obd_attached)
489 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
490 i, status, obd->obd_type->typ_name,
491 obd->obd_name, obd->obd_uuid.uuid,
492 cfs_atomic_read(&obd->obd_refcount));
494 cfs_read_unlock(&obd_dev_lock);
498 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
499 specified, then only the client with that uuid is returned,
500 otherwise any client connected to the tgt is returned. */
501 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
502 const char * typ_name,
503 struct obd_uuid *grp_uuid)
507 cfs_read_lock(&obd_dev_lock);
508 for (i = 0; i < class_devno_max(); i++) {
509 struct obd_device *obd = class_num2obd(i);
513 if ((strncmp(obd->obd_type->typ_name, typ_name,
514 strlen(typ_name)) == 0)) {
515 if (obd_uuid_equals(tgt_uuid,
516 &obd->u.cli.cl_target_uuid) &&
517 ((grp_uuid)? obd_uuid_equals(grp_uuid,
518 &obd->obd_uuid) : 1)) {
519 cfs_read_unlock(&obd_dev_lock);
524 cfs_read_unlock(&obd_dev_lock);
528 EXPORT_SYMBOL(class_find_client_obd);
530 /* Iterate the obd_device list looking devices have grp_uuid. Start
531 searching at *next, and if a device is found, the next index to look
532 at is saved in *next. If next is NULL, then the first matching device
533 will always be returned. */
534 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
540 else if (*next >= 0 && *next < class_devno_max())
545 cfs_read_lock(&obd_dev_lock);
546 for (; i < class_devno_max(); i++) {
547 struct obd_device *obd = class_num2obd(i);
551 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
554 cfs_read_unlock(&obd_dev_lock);
558 cfs_read_unlock(&obd_dev_lock);
562 EXPORT_SYMBOL(class_devices_in_group);
565 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
566 * adjust sptlrpc settings accordingly.
568 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
570 struct obd_device *obd;
574 LASSERT(namelen > 0);
576 cfs_read_lock(&obd_dev_lock);
577 for (i = 0; i < class_devno_max(); i++) {
578 obd = class_num2obd(i);
580 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
583 /* only notify mdc, osc, mdt, ost */
584 type = obd->obd_type->typ_name;
585 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
586 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
587 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
588 strcmp(type, LUSTRE_OST_NAME) != 0)
591 if (strncmp(obd->obd_name, fsname, namelen))
594 class_incref(obd, __FUNCTION__, obd);
595 cfs_read_unlock(&obd_dev_lock);
596 rc2 = obd_set_info_async(obd->obd_self_export,
597 sizeof(KEY_SPTLRPC_CONF),
598 KEY_SPTLRPC_CONF, 0, NULL, NULL);
600 class_decref(obd, __FUNCTION__, obd);
601 cfs_read_lock(&obd_dev_lock);
603 cfs_read_unlock(&obd_dev_lock);
606 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
608 void obd_cleanup_caches(void)
613 if (obd_device_cachep) {
614 rc = cfs_mem_cache_destroy(obd_device_cachep);
615 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
616 obd_device_cachep = NULL;
619 rc = cfs_mem_cache_destroy(obdo_cachep);
620 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
624 rc = cfs_mem_cache_destroy(import_cachep);
625 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
626 import_cachep = NULL;
629 rc = cfs_mem_cache_destroy(capa_cachep);
630 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
636 int obd_init_caches(void)
640 LASSERT(obd_device_cachep == NULL);
641 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
642 sizeof(struct obd_device),
644 if (!obd_device_cachep)
647 LASSERT(obdo_cachep == NULL);
648 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
653 LASSERT(import_cachep == NULL);
654 import_cachep = cfs_mem_cache_create("ll_import_cache",
655 sizeof(struct obd_import),
660 LASSERT(capa_cachep == NULL);
661 capa_cachep = cfs_mem_cache_create("capa_cache",
662 sizeof(struct obd_capa), 0, 0);
668 obd_cleanup_caches();
673 /* map connection to client */
674 struct obd_export *class_conn2export(struct lustre_handle *conn)
676 struct obd_export *export;
680 CDEBUG(D_CACHE, "looking for null handle\n");
684 if (conn->cookie == -1) { /* this means assign a new connection */
685 CDEBUG(D_CACHE, "want a new connection\n");
689 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
690 export = class_handle2object(conn->cookie);
693 EXPORT_SYMBOL(class_conn2export);
695 struct obd_device *class_exp2obd(struct obd_export *exp)
701 EXPORT_SYMBOL(class_exp2obd);
703 struct obd_device *class_conn2obd(struct lustre_handle *conn)
705 struct obd_export *export;
706 export = class_conn2export(conn);
708 struct obd_device *obd = export->exp_obd;
709 class_export_put(export);
714 EXPORT_SYMBOL(class_conn2obd);
716 struct obd_import *class_exp2cliimp(struct obd_export *exp)
718 struct obd_device *obd = exp->exp_obd;
721 return obd->u.cli.cl_import;
723 EXPORT_SYMBOL(class_exp2cliimp);
725 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
727 struct obd_device *obd = class_conn2obd(conn);
730 return obd->u.cli.cl_import;
732 EXPORT_SYMBOL(class_conn2cliimp);
734 /* Export management functions */
735 static void class_export_destroy(struct obd_export *exp)
737 struct obd_device *obd = exp->exp_obd;
740 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
742 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
743 exp->exp_client_uuid.uuid, obd->obd_name);
745 LASSERT(obd != NULL);
747 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
748 if (exp->exp_connection)
749 ptlrpc_put_connection_superhack(exp->exp_connection);
751 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
752 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
753 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
754 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
755 obd_destroy_export(exp);
756 class_decref(obd, "export", exp);
758 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
762 static void export_handle_addref(void *export)
764 class_export_get(export);
767 struct obd_export *class_export_get(struct obd_export *exp)
769 cfs_atomic_inc(&exp->exp_refcount);
770 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
771 cfs_atomic_read(&exp->exp_refcount));
774 EXPORT_SYMBOL(class_export_get);
776 void class_export_put(struct obd_export *exp)
778 LASSERT(exp != NULL);
779 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
780 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
781 cfs_atomic_read(&exp->exp_refcount) - 1);
783 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
784 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
785 CDEBUG(D_IOCTL, "final put %p/%s\n",
786 exp, exp->exp_client_uuid.uuid);
788 /* release nid stat refererence */
789 lprocfs_exp_cleanup(exp);
791 obd_zombie_export_add(exp);
794 EXPORT_SYMBOL(class_export_put);
796 /* Creates a new export, adds it to the hash table, and returns a
797 * pointer to it. The refcount is 2: one for the hash reference, and
798 * one for the pointer returned by this function. */
799 struct obd_export *class_new_export(struct obd_device *obd,
800 struct obd_uuid *cluuid)
802 struct obd_export *export;
803 cfs_hash_t *hash = NULL;
807 OBD_ALLOC_PTR(export);
809 return ERR_PTR(-ENOMEM);
811 export->exp_conn_cnt = 0;
812 export->exp_lock_hash = NULL;
813 cfs_atomic_set(&export->exp_refcount, 2);
814 cfs_atomic_set(&export->exp_rpc_count, 0);
815 cfs_atomic_set(&export->exp_cb_count, 0);
816 cfs_atomic_set(&export->exp_locks_count, 0);
817 #if LUSTRE_TRACKS_LOCK_EXP_REFS
818 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
819 cfs_spin_lock_init(&export->exp_locks_list_guard);
821 cfs_atomic_set(&export->exp_replay_count, 0);
822 export->exp_obd = obd;
823 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
824 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
825 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
826 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
827 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
828 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
829 class_handle_hash(&export->exp_handle, export_handle_addref);
830 export->exp_last_request_time = cfs_time_current_sec();
831 cfs_spin_lock_init(&export->exp_lock);
832 cfs_spin_lock_init(&export->exp_rpc_lock);
833 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
834 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
835 cfs_spin_lock_init(&export->exp_bl_list_lock);
836 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
838 export->exp_sp_peer = LUSTRE_SP_ANY;
839 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
840 export->exp_client_uuid = *cluuid;
841 obd_init_export(export);
843 cfs_spin_lock(&obd->obd_dev_lock);
844 /* shouldn't happen, but might race */
845 if (obd->obd_stopping)
846 GOTO(exit_unlock, rc = -ENODEV);
848 hash = cfs_hash_getref(obd->obd_uuid_hash);
850 GOTO(exit_unlock, rc = -ENODEV);
851 cfs_spin_unlock(&obd->obd_dev_lock);
853 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
854 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
856 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
857 obd->obd_name, cluuid->uuid, rc);
858 GOTO(exit_err, rc = -EALREADY);
862 cfs_spin_lock(&obd->obd_dev_lock);
863 if (obd->obd_stopping) {
864 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
865 GOTO(exit_unlock, rc = -ENODEV);
868 class_incref(obd, "export", export);
869 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
870 cfs_list_add_tail(&export->exp_obd_chain_timed,
871 &export->exp_obd->obd_exports_timed);
872 export->exp_obd->obd_num_exports++;
873 cfs_spin_unlock(&obd->obd_dev_lock);
874 cfs_hash_putref(hash);
878 cfs_spin_unlock(&obd->obd_dev_lock);
881 cfs_hash_putref(hash);
882 class_handle_unhash(&export->exp_handle);
883 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
884 obd_destroy_export(export);
885 OBD_FREE_PTR(export);
888 EXPORT_SYMBOL(class_new_export);
890 void class_unlink_export(struct obd_export *exp)
892 class_handle_unhash(&exp->exp_handle);
894 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
895 /* delete an uuid-export hashitem from hashtables */
896 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
897 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
898 &exp->exp_client_uuid,
899 &exp->exp_uuid_hash);
901 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
902 cfs_list_del_init(&exp->exp_obd_chain_timed);
903 exp->exp_obd->obd_num_exports--;
904 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
905 class_export_put(exp);
907 EXPORT_SYMBOL(class_unlink_export);
909 /* Import management functions */
910 void class_import_destroy(struct obd_import *imp)
914 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
915 imp->imp_obd->obd_name);
917 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
919 ptlrpc_put_connection_superhack(imp->imp_connection);
921 while (!cfs_list_empty(&imp->imp_conn_list)) {
922 struct obd_import_conn *imp_conn;
924 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
925 struct obd_import_conn, oic_item);
926 cfs_list_del_init(&imp_conn->oic_item);
927 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
928 OBD_FREE(imp_conn, sizeof(*imp_conn));
931 LASSERT(imp->imp_sec == NULL);
932 class_decref(imp->imp_obd, "import", imp);
933 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
937 static void import_handle_addref(void *import)
939 class_import_get(import);
942 struct obd_import *class_import_get(struct obd_import *import)
944 cfs_atomic_inc(&import->imp_refcount);
945 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
946 cfs_atomic_read(&import->imp_refcount),
947 import->imp_obd->obd_name);
950 EXPORT_SYMBOL(class_import_get);
952 void class_import_put(struct obd_import *imp)
956 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
957 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
959 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
960 cfs_atomic_read(&imp->imp_refcount) - 1,
961 imp->imp_obd->obd_name);
963 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
964 CDEBUG(D_INFO, "final put import %p\n", imp);
965 obd_zombie_import_add(imp);
970 EXPORT_SYMBOL(class_import_put);
972 static void init_imp_at(struct imp_at *at) {
974 at_init(&at->iat_net_latency, 0, 0);
975 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
976 /* max service estimates are tracked on the server side, so
977 don't use the AT history here, just use the last reported
978 val. (But keep hist for proc histogram, worst_ever) */
979 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
984 struct obd_import *class_new_import(struct obd_device *obd)
986 struct obd_import *imp;
988 OBD_ALLOC(imp, sizeof(*imp));
992 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
993 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
994 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
995 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
996 cfs_spin_lock_init(&imp->imp_lock);
997 imp->imp_last_success_conn = 0;
998 imp->imp_state = LUSTRE_IMP_NEW;
999 imp->imp_obd = class_incref(obd, "import", imp);
1000 cfs_sema_init(&imp->imp_sec_mutex, 1);
1001 cfs_waitq_init(&imp->imp_recovery_waitq);
1003 cfs_atomic_set(&imp->imp_refcount, 2);
1004 cfs_atomic_set(&imp->imp_unregistering, 0);
1005 cfs_atomic_set(&imp->imp_inflight, 0);
1006 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1007 cfs_atomic_set(&imp->imp_inval_count, 0);
1008 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1009 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1010 class_handle_hash(&imp->imp_handle, import_handle_addref);
1011 init_imp_at(&imp->imp_at);
1013 /* the default magic is V2, will be used in connect RPC, and
1014 * then adjusted according to the flags in request/reply. */
1015 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1019 EXPORT_SYMBOL(class_new_import);
1021 void class_destroy_import(struct obd_import *import)
1023 LASSERT(import != NULL);
1024 LASSERT(import != LP_POISON);
1026 class_handle_unhash(&import->imp_handle);
1028 cfs_spin_lock(&import->imp_lock);
1029 import->imp_generation++;
1030 cfs_spin_unlock(&import->imp_lock);
1031 class_import_put(import);
1033 EXPORT_SYMBOL(class_destroy_import);
1035 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1037 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1039 cfs_spin_lock(&exp->exp_locks_list_guard);
1041 LASSERT(lock->l_exp_refs_nr >= 0);
1043 if (lock->l_exp_refs_target != NULL &&
1044 lock->l_exp_refs_target != exp) {
1045 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1046 exp, lock, lock->l_exp_refs_target);
1048 if ((lock->l_exp_refs_nr ++) == 0) {
1049 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1050 lock->l_exp_refs_target = exp;
1052 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1053 lock, exp, lock->l_exp_refs_nr);
1054 cfs_spin_unlock(&exp->exp_locks_list_guard);
1056 EXPORT_SYMBOL(__class_export_add_lock_ref);
1058 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1060 cfs_spin_lock(&exp->exp_locks_list_guard);
1061 LASSERT(lock->l_exp_refs_nr > 0);
1062 if (lock->l_exp_refs_target != exp) {
1063 LCONSOLE_WARN("lock %p, "
1064 "mismatching export pointers: %p, %p\n",
1065 lock, lock->l_exp_refs_target, exp);
1067 if (-- lock->l_exp_refs_nr == 0) {
1068 cfs_list_del_init(&lock->l_exp_refs_link);
1069 lock->l_exp_refs_target = NULL;
1071 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1072 lock, exp, lock->l_exp_refs_nr);
1073 cfs_spin_unlock(&exp->exp_locks_list_guard);
1075 EXPORT_SYMBOL(__class_export_del_lock_ref);
1078 /* A connection defines an export context in which preallocation can
1079 be managed. This releases the export pointer reference, and returns
1080 the export handle, so the export refcount is 1 when this function
1082 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1083 struct obd_uuid *cluuid)
1085 struct obd_export *export;
1086 LASSERT(conn != NULL);
1087 LASSERT(obd != NULL);
1088 LASSERT(cluuid != NULL);
1091 export = class_new_export(obd, cluuid);
1093 RETURN(PTR_ERR(export));
1095 conn->cookie = export->exp_handle.h_cookie;
1096 class_export_put(export);
1098 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1099 cluuid->uuid, conn->cookie);
1102 EXPORT_SYMBOL(class_connect);
1104 /* if export is involved in recovery then clean up related things */
1105 void class_export_recovery_cleanup(struct obd_export *exp)
1107 struct obd_device *obd = exp->exp_obd;
1109 cfs_spin_lock(&obd->obd_recovery_task_lock);
1110 if (exp->exp_delayed)
1111 obd->obd_delayed_clients--;
1112 if (obd->obd_recovering && exp->exp_in_recovery) {
1113 cfs_spin_lock(&exp->exp_lock);
1114 exp->exp_in_recovery = 0;
1115 cfs_spin_unlock(&exp->exp_lock);
1116 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1117 cfs_atomic_dec(&obd->obd_connected_clients);
1119 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1120 /** Cleanup req replay fields */
1121 if (exp->exp_req_replay_needed) {
1122 cfs_spin_lock(&exp->exp_lock);
1123 exp->exp_req_replay_needed = 0;
1124 cfs_spin_unlock(&exp->exp_lock);
1125 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1126 cfs_atomic_dec(&obd->obd_req_replay_clients);
1128 /** Cleanup lock replay data */
1129 if (exp->exp_lock_replay_needed) {
1130 cfs_spin_lock(&exp->exp_lock);
1131 exp->exp_lock_replay_needed = 0;
1132 cfs_spin_unlock(&exp->exp_lock);
1133 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1134 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1138 /* This function removes 1-3 references from the export:
1139 * 1 - for export pointer passed
1140 * and if disconnect really need
1141 * 2 - removing from hash
1142 * 3 - in client_unlink_export
1143 * The export pointer passed to this function can destroyed */
1144 int class_disconnect(struct obd_export *export)
1146 int already_disconnected;
1149 if (export == NULL) {
1150 CWARN("attempting to free NULL export %p\n", export);
1154 cfs_spin_lock(&export->exp_lock);
1155 already_disconnected = export->exp_disconnected;
1156 export->exp_disconnected = 1;
1157 cfs_spin_unlock(&export->exp_lock);
1159 /* class_cleanup(), abort_recovery(), and class_fail_export()
1160 * all end up in here, and if any of them race we shouldn't
1161 * call extra class_export_puts(). */
1162 if (already_disconnected) {
1163 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1164 GOTO(no_disconn, already_disconnected);
1167 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1168 export->exp_handle.h_cookie);
1170 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1171 cfs_hash_del(export->exp_obd->obd_nid_hash,
1172 &export->exp_connection->c_peer.nid,
1173 &export->exp_nid_hash);
1175 class_export_recovery_cleanup(export);
1176 class_unlink_export(export);
1178 class_export_put(export);
1181 EXPORT_SYMBOL(class_disconnect);
1183 /* Return non-zero for a fully connected export */
1184 int class_connected_export(struct obd_export *exp)
1188 cfs_spin_lock(&exp->exp_lock);
1189 connected = (exp->exp_conn_cnt > 0);
1190 cfs_spin_unlock(&exp->exp_lock);
1195 EXPORT_SYMBOL(class_connected_export);
1197 static void class_disconnect_export_list(cfs_list_t *list,
1198 enum obd_option flags)
1201 struct obd_export *exp;
1204 /* It's possible that an export may disconnect itself, but
1205 * nothing else will be added to this list. */
1206 while (!cfs_list_empty(list)) {
1207 exp = cfs_list_entry(list->next, struct obd_export,
1209 /* need for safe call CDEBUG after obd_disconnect */
1210 class_export_get(exp);
1212 cfs_spin_lock(&exp->exp_lock);
1213 exp->exp_flags = flags;
1214 cfs_spin_unlock(&exp->exp_lock);
1216 if (obd_uuid_equals(&exp->exp_client_uuid,
1217 &exp->exp_obd->obd_uuid)) {
1219 "exp %p export uuid == obd uuid, don't discon\n",
1221 /* Need to delete this now so we don't end up pointing
1222 * to work_list later when this export is cleaned up. */
1223 cfs_list_del_init(&exp->exp_obd_chain);
1224 class_export_put(exp);
1228 class_export_get(exp);
1229 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1230 "last request at "CFS_TIME_T"\n",
1231 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1232 exp, exp->exp_last_request_time);
1233 /* release one export reference anyway */
1234 rc = obd_disconnect(exp);
1236 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1237 obd_export_nid2str(exp), exp, rc);
1238 class_export_put(exp);
1243 void class_disconnect_exports(struct obd_device *obd)
1245 cfs_list_t work_list;
1248 /* Move all of the exports from obd_exports to a work list, en masse. */
1249 CFS_INIT_LIST_HEAD(&work_list);
1250 cfs_spin_lock(&obd->obd_dev_lock);
1251 cfs_list_splice_init(&obd->obd_exports, &work_list);
1252 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1253 cfs_spin_unlock(&obd->obd_dev_lock);
1255 if (!cfs_list_empty(&work_list)) {
1256 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1257 "disconnecting them\n", obd->obd_minor, obd);
1258 class_disconnect_export_list(&work_list,
1259 exp_flags_from_obd(obd));
1261 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1262 obd->obd_minor, obd);
1265 EXPORT_SYMBOL(class_disconnect_exports);
1267 /* Remove exports that have not completed recovery.
1269 void class_disconnect_stale_exports(struct obd_device *obd,
1270 int (*test_export)(struct obd_export *))
1272 cfs_list_t work_list;
1273 cfs_list_t *pos, *n;
1274 struct obd_export *exp;
1278 CFS_INIT_LIST_HEAD(&work_list);
1279 cfs_spin_lock(&obd->obd_dev_lock);
1280 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1281 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1282 if (test_export(exp))
1285 /* don't count self-export as client */
1286 if (obd_uuid_equals(&exp->exp_client_uuid,
1287 &exp->exp_obd->obd_uuid))
1290 cfs_list_move(&exp->exp_obd_chain, &work_list);
1292 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1293 obd->obd_name, exp->exp_client_uuid.uuid,
1294 exp->exp_connection == NULL ? "<unknown>" :
1295 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1296 print_export_data(exp, "EVICTING", 0);
1298 cfs_spin_unlock(&obd->obd_dev_lock);
1301 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1302 obd->obd_name, evicted);
1303 obd->obd_stale_clients += evicted;
1305 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1306 OBD_OPT_ABORT_RECOV);
1309 EXPORT_SYMBOL(class_disconnect_stale_exports);
1311 void class_fail_export(struct obd_export *exp)
1313 int rc, already_failed;
1315 cfs_spin_lock(&exp->exp_lock);
1316 already_failed = exp->exp_failed;
1317 exp->exp_failed = 1;
1318 cfs_spin_unlock(&exp->exp_lock);
1320 if (already_failed) {
1321 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1322 exp, exp->exp_client_uuid.uuid);
1326 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1327 exp, exp->exp_client_uuid.uuid);
1329 if (obd_dump_on_timeout)
1330 libcfs_debug_dumplog();
1332 /* Most callers into obd_disconnect are removing their own reference
1333 * (request, for example) in addition to the one from the hash table.
1334 * We don't have such a reference here, so make one. */
1335 class_export_get(exp);
1336 rc = obd_disconnect(exp);
1338 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1340 CDEBUG(D_HA, "disconnected export %p/%s\n",
1341 exp, exp->exp_client_uuid.uuid);
1343 EXPORT_SYMBOL(class_fail_export);
1345 char *obd_export_nid2str(struct obd_export *exp)
1347 if (exp->exp_connection != NULL)
1348 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1352 EXPORT_SYMBOL(obd_export_nid2str);
1354 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1356 struct obd_export *doomed_exp = NULL;
1357 int exports_evicted = 0;
1359 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1362 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1363 if (doomed_exp == NULL)
1366 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1367 "nid %s found, wanted nid %s, requested nid %s\n",
1368 obd_export_nid2str(doomed_exp),
1369 libcfs_nid2str(nid_key), nid);
1370 LASSERTF(doomed_exp != obd->obd_self_export,
1371 "self-export is hashed by NID?\n");
1373 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1374 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1376 class_fail_export(doomed_exp);
1377 class_export_put(doomed_exp);
1380 if (!exports_evicted)
1381 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1382 obd->obd_name, nid);
1383 return exports_evicted;
1385 EXPORT_SYMBOL(obd_export_evict_by_nid);
1387 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1389 struct obd_export *doomed_exp = NULL;
1390 struct obd_uuid doomed_uuid;
1391 int exports_evicted = 0;
1393 obd_str2uuid(&doomed_uuid, uuid);
1394 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1395 CERROR("%s: can't evict myself\n", obd->obd_name);
1396 return exports_evicted;
1399 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1401 if (doomed_exp == NULL) {
1402 CERROR("%s: can't disconnect %s: no exports found\n",
1403 obd->obd_name, uuid);
1405 CWARN("%s: evicting %s at adminstrative request\n",
1406 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1407 class_fail_export(doomed_exp);
1408 class_export_put(doomed_exp);
1412 return exports_evicted;
1414 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1416 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1417 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1418 EXPORT_SYMBOL(class_export_dump_hook);
1421 static void print_export_data(struct obd_export *exp, const char *status,
1424 struct ptlrpc_reply_state *rs;
1425 struct ptlrpc_reply_state *first_reply = NULL;
1428 cfs_spin_lock(&exp->exp_lock);
1429 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1435 cfs_spin_unlock(&exp->exp_lock);
1437 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1438 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1439 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1440 cfs_atomic_read(&exp->exp_rpc_count),
1441 cfs_atomic_read(&exp->exp_cb_count),
1442 cfs_atomic_read(&exp->exp_locks_count),
1443 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1444 nreplies, first_reply, nreplies > 3 ? "..." : "",
1445 exp->exp_last_committed);
1446 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1447 if (locks && class_export_dump_hook != NULL)
1448 class_export_dump_hook(exp);
1452 void dump_exports(struct obd_device *obd, int locks)
1454 struct obd_export *exp;
1456 cfs_spin_lock(&obd->obd_dev_lock);
1457 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1458 print_export_data(exp, "ACTIVE", locks);
1459 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1460 print_export_data(exp, "UNLINKED", locks);
1461 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1462 print_export_data(exp, "DELAYED", locks);
1463 cfs_spin_unlock(&obd->obd_dev_lock);
1464 cfs_spin_lock(&obd_zombie_impexp_lock);
1465 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1466 print_export_data(exp, "ZOMBIE", locks);
1467 cfs_spin_unlock(&obd_zombie_impexp_lock);
1469 EXPORT_SYMBOL(dump_exports);
1471 void obd_exports_barrier(struct obd_device *obd)
1474 LASSERT(cfs_list_empty(&obd->obd_exports));
1475 cfs_spin_lock(&obd->obd_dev_lock);
1476 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1477 cfs_spin_unlock(&obd->obd_dev_lock);
1478 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1479 cfs_time_seconds(waited));
1480 if (waited > 5 && IS_PO2(waited)) {
1481 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1482 "more than %d seconds. "
1483 "The obd refcount = %d. Is it stuck?\n",
1484 obd->obd_name, waited,
1485 cfs_atomic_read(&obd->obd_refcount));
1486 dump_exports(obd, 1);
1489 cfs_spin_lock(&obd->obd_dev_lock);
1491 cfs_spin_unlock(&obd->obd_dev_lock);
1493 EXPORT_SYMBOL(obd_exports_barrier);
1495 /* Total amount of zombies to be destroyed */
1496 static int zombies_count = 0;
1499 * kill zombie imports and exports
1501 void obd_zombie_impexp_cull(void)
1503 struct obd_import *import;
1504 struct obd_export *export;
1508 cfs_spin_lock(&obd_zombie_impexp_lock);
1511 if (!cfs_list_empty(&obd_zombie_imports)) {
1512 import = cfs_list_entry(obd_zombie_imports.next,
1515 cfs_list_del_init(&import->imp_zombie_chain);
1519 if (!cfs_list_empty(&obd_zombie_exports)) {
1520 export = cfs_list_entry(obd_zombie_exports.next,
1523 cfs_list_del_init(&export->exp_obd_chain);
1526 cfs_spin_unlock(&obd_zombie_impexp_lock);
1528 if (import != NULL) {
1529 class_import_destroy(import);
1530 cfs_spin_lock(&obd_zombie_impexp_lock);
1532 cfs_spin_unlock(&obd_zombie_impexp_lock);
1535 if (export != NULL) {
1536 class_export_destroy(export);
1537 cfs_spin_lock(&obd_zombie_impexp_lock);
1539 cfs_spin_unlock(&obd_zombie_impexp_lock);
1543 } while (import != NULL || export != NULL);
1547 static cfs_completion_t obd_zombie_start;
1548 static cfs_completion_t obd_zombie_stop;
1549 static unsigned long obd_zombie_flags;
1550 static cfs_waitq_t obd_zombie_waitq;
1551 static pid_t obd_zombie_pid;
1554 OBD_ZOMBIE_STOP = 1 << 1
1558 * check for work for kill zombie import/export thread.
1560 static int obd_zombie_impexp_check(void *arg)
1564 cfs_spin_lock(&obd_zombie_impexp_lock);
1565 rc = (zombies_count == 0) &&
1566 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1567 cfs_spin_unlock(&obd_zombie_impexp_lock);
1573 * Add export to the obd_zombe thread and notify it.
1575 static void obd_zombie_export_add(struct obd_export *exp) {
1576 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1577 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1578 cfs_list_del_init(&exp->exp_obd_chain);
1579 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1580 cfs_spin_lock(&obd_zombie_impexp_lock);
1582 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1583 cfs_spin_unlock(&obd_zombie_impexp_lock);
1585 obd_zombie_impexp_notify();
1589 * Add import to the obd_zombe thread and notify it.
1591 static void obd_zombie_import_add(struct obd_import *imp) {
1592 LASSERT(imp->imp_sec == NULL);
1593 cfs_spin_lock(&obd_zombie_impexp_lock);
1594 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1596 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1597 cfs_spin_unlock(&obd_zombie_impexp_lock);
1599 obd_zombie_impexp_notify();
1603 * notify import/export destroy thread about new zombie.
1605 static void obd_zombie_impexp_notify(void)
1608 * Make sure obd_zomebie_impexp_thread get this notification.
1609 * It is possible this signal only get by obd_zombie_barrier, and
1610 * barrier gulps this notification and sleeps away and hangs ensues
1612 cfs_waitq_broadcast(&obd_zombie_waitq);
1616 * check whether obd_zombie is idle
1618 static int obd_zombie_is_idle(void)
1622 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1623 cfs_spin_lock(&obd_zombie_impexp_lock);
1624 rc = (zombies_count == 0);
1625 cfs_spin_unlock(&obd_zombie_impexp_lock);
1630 * wait when obd_zombie import/export queues become empty
1632 void obd_zombie_barrier(void)
1634 struct l_wait_info lwi = { 0 };
1636 if (obd_zombie_pid == cfs_curproc_pid())
1637 /* don't wait for myself */
1639 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1641 EXPORT_SYMBOL(obd_zombie_barrier);
1646 * destroy zombie export/import thread.
1648 static int obd_zombie_impexp_thread(void *unused)
1652 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1653 cfs_complete(&obd_zombie_start);
1657 cfs_complete(&obd_zombie_start);
1659 obd_zombie_pid = cfs_curproc_pid();
1661 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1662 struct l_wait_info lwi = { 0 };
1664 l_wait_event(obd_zombie_waitq,
1665 !obd_zombie_impexp_check(NULL), &lwi);
1666 obd_zombie_impexp_cull();
1669 * Notify obd_zombie_barrier callers that queues
1672 cfs_waitq_signal(&obd_zombie_waitq);
1675 cfs_complete(&obd_zombie_stop);
1680 #else /* ! KERNEL */
1682 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1683 static void *obd_zombie_impexp_work_cb;
1684 static void *obd_zombie_impexp_idle_cb;
1686 int obd_zombie_impexp_kill(void *arg)
1690 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1691 obd_zombie_impexp_cull();
1694 cfs_atomic_dec(&zombie_recur);
1701 * start destroy zombie import/export thread
1703 int obd_zombie_impexp_init(void)
1707 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1708 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1709 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1710 cfs_init_completion(&obd_zombie_start);
1711 cfs_init_completion(&obd_zombie_stop);
1712 cfs_waitq_init(&obd_zombie_waitq);
1716 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1720 cfs_wait_for_completion(&obd_zombie_start);
1723 obd_zombie_impexp_work_cb =
1724 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1725 &obd_zombie_impexp_kill, NULL);
1727 obd_zombie_impexp_idle_cb =
1728 liblustre_register_idle_callback("obd_zombi_impexp_check",
1729 &obd_zombie_impexp_check, NULL);
1735 * stop destroy zombie import/export thread
1737 void obd_zombie_impexp_stop(void)
1739 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1740 obd_zombie_impexp_notify();
1742 cfs_wait_for_completion(&obd_zombie_stop);
1744 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1745 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1749 /***** Kernel-userspace comm helpers *******/
1751 /* Get length of entire message, including header */
1752 int kuc_len(int payload_len)
1754 return sizeof(struct kuc_hdr) + payload_len;
1756 EXPORT_SYMBOL(kuc_len);
1758 /* Get a pointer to kuc header, given a ptr to the payload
1759 * @param p Pointer to payload area
1760 * @returns Pointer to kuc header
1762 struct kuc_hdr * kuc_ptr(void *p)
1764 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1765 LASSERT(lh->kuc_magic == KUC_MAGIC);
1768 EXPORT_SYMBOL(kuc_ptr);
1770 /* Test if payload is part of kuc message
1771 * @param p Pointer to payload area
1774 int kuc_ispayload(void *p)
1776 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1778 if (kh->kuc_magic == KUC_MAGIC)
1783 EXPORT_SYMBOL(kuc_ispayload);
1785 /* Alloc space for a message, and fill in header
1786 * @return Pointer to payload area
1788 void *kuc_alloc(int payload_len, int transport, int type)
1791 int len = kuc_len(payload_len);
1795 return ERR_PTR(-ENOMEM);
1797 lh->kuc_magic = KUC_MAGIC;
1798 lh->kuc_transport = transport;
1799 lh->kuc_msgtype = type;
1800 lh->kuc_msglen = len;
1802 return (void *)(lh + 1);
1804 EXPORT_SYMBOL(kuc_alloc);
1806 /* Takes pointer to payload area */
1807 inline void kuc_free(void *p, int payload_len)
1809 struct kuc_hdr *lh = kuc_ptr(p);
1810 OBD_FREE(lh, kuc_len(payload_len));
1812 EXPORT_SYMBOL(kuc_free);