1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/obdclass/genops.c
41 * These are the only exported functions, they provide some generic
42 * infrastructure for managing object devices
45 #define DEBUG_SUBSYSTEM S_CLASS
47 #include <liblustre.h>
50 #include <obd_class.h>
51 #include <lprocfs_status.h>
53 extern cfs_list_t obd_types;
54 cfs_spinlock_t obd_types_lock;
56 cfs_mem_cache_t *obd_device_cachep;
57 cfs_mem_cache_t *obdo_cachep;
58 EXPORT_SYMBOL(obdo_cachep);
59 cfs_mem_cache_t *import_cachep;
61 cfs_list_t obd_zombie_imports;
62 cfs_list_t obd_zombie_exports;
63 cfs_spinlock_t obd_zombie_impexp_lock;
64 static void obd_zombie_impexp_notify(void);
65 static void obd_zombie_export_add(struct obd_export *exp);
66 static void obd_zombie_import_add(struct obd_import *imp);
67 static void print_export_data(struct obd_export *exp,
68 const char *status, int locks);
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 * support functions: we could use inter-module communication, but this
74 * is more portable to other OS's
76 static struct obd_device *obd_device_alloc(void)
78 struct obd_device *obd;
80 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
82 obd->obd_magic = OBD_DEVICE_MAGIC;
87 static void obd_device_free(struct obd_device *obd)
90 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92 if (obd->obd_namespace != NULL) {
93 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94 obd, obd->obd_namespace, obd->obd_force);
97 lu_ref_fini(&obd->obd_reference);
98 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 struct obd_type *class_search_type(const char *name)
104 struct obd_type *type;
106 cfs_spin_lock(&obd_types_lock);
107 cfs_list_for_each(tmp, &obd_types) {
108 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
109 if (strcmp(type->typ_name, name) == 0) {
110 cfs_spin_unlock(&obd_types_lock);
114 cfs_spin_unlock(&obd_types_lock);
118 struct obd_type *class_get_type(const char *name)
120 struct obd_type *type = class_search_type(name);
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
124 const char *modname = name;
125 if (!cfs_request_module("%s", modname)) {
126 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127 type = class_search_type(name);
129 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135 cfs_spin_lock(&type->obd_type_lock);
137 cfs_try_module_get(type->typ_dt_ops->o_owner);
138 cfs_spin_unlock(&type->obd_type_lock);
143 void class_put_type(struct obd_type *type)
146 cfs_spin_lock(&type->obd_type_lock);
148 cfs_module_put(type->typ_dt_ops->o_owner);
149 cfs_spin_unlock(&type->obd_type_lock);
152 #define CLASS_MAX_NAME 1024
154 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
155 struct lprocfs_vars *vars, const char *name,
156 struct lu_device_type *ldt)
158 struct obd_type *type;
163 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
165 if (class_search_type(name)) {
166 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
171 OBD_ALLOC(type, sizeof(*type));
175 OBD_ALLOC_PTR(type->typ_dt_ops);
176 OBD_ALLOC_PTR(type->typ_md_ops);
177 OBD_ALLOC(type->typ_name, strlen(name) + 1);
179 if (type->typ_dt_ops == NULL ||
180 type->typ_md_ops == NULL ||
181 type->typ_name == NULL)
184 *(type->typ_dt_ops) = *dt_ops;
185 /* md_ops is optional */
187 *(type->typ_md_ops) = *md_ops;
188 strcpy(type->typ_name, name);
189 cfs_spin_lock_init(&type->obd_type_lock);
192 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
194 if (IS_ERR(type->typ_procroot)) {
195 rc = PTR_ERR(type->typ_procroot);
196 type->typ_procroot = NULL;
202 rc = lu_device_type_init(ldt);
207 cfs_spin_lock(&obd_types_lock);
208 cfs_list_add(&type->typ_chain, &obd_types);
209 cfs_spin_unlock(&obd_types_lock);
214 if (type->typ_name != NULL)
215 OBD_FREE(type->typ_name, strlen(name) + 1);
216 if (type->typ_md_ops != NULL)
217 OBD_FREE_PTR(type->typ_md_ops);
218 if (type->typ_dt_ops != NULL)
219 OBD_FREE_PTR(type->typ_dt_ops);
220 OBD_FREE(type, sizeof(*type));
224 int class_unregister_type(const char *name)
226 struct obd_type *type = class_search_type(name);
230 CERROR("unknown obd type\n");
234 if (type->typ_refcnt) {
235 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
236 /* This is a bad situation, let's make the best of it */
237 /* Remove ops, but leave the name for debugging */
238 OBD_FREE_PTR(type->typ_dt_ops);
239 OBD_FREE_PTR(type->typ_md_ops);
243 if (type->typ_procroot) {
244 lprocfs_remove(&type->typ_procroot);
248 lu_device_type_fini(type->typ_lu);
250 cfs_spin_lock(&obd_types_lock);
251 cfs_list_del(&type->typ_chain);
252 cfs_spin_unlock(&obd_types_lock);
253 OBD_FREE(type->typ_name, strlen(name) + 1);
254 if (type->typ_dt_ops != NULL)
255 OBD_FREE_PTR(type->typ_dt_ops);
256 if (type->typ_md_ops != NULL)
257 OBD_FREE_PTR(type->typ_md_ops);
258 OBD_FREE(type, sizeof(*type));
260 } /* class_unregister_type */
263 * Create a new obd device.
265 * Find an empty slot in ::obd_devs[], create a new obd device in it.
267 * \param[in] type_name obd device type string.
268 * \param[in] name obd device name.
270 * \retval NULL if create fails, otherwise return the obd device
273 struct obd_device *class_newdev(const char *type_name, const char *name)
275 struct obd_device *result = NULL;
276 struct obd_device *newdev;
277 struct obd_type *type = NULL;
279 int new_obd_minor = 0;
281 if (strlen(name) >= MAX_OBD_NAME) {
282 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
283 RETURN(ERR_PTR(-EINVAL));
286 type = class_get_type(type_name);
288 CERROR("OBD: unknown type: %s\n", type_name);
289 RETURN(ERR_PTR(-ENODEV));
292 newdev = obd_device_alloc();
293 if (newdev == NULL) {
294 class_put_type(type);
295 RETURN(ERR_PTR(-ENOMEM));
297 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
299 cfs_spin_lock(&obd_dev_lock);
300 for (i = 0; i < class_devno_max(); i++) {
301 struct obd_device *obd = class_num2obd(i);
302 if (obd && obd->obd_name &&
303 (strcmp(name, obd->obd_name) == 0)) {
304 CERROR("Device %s already exists, won't add\n", name);
306 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
307 "%p obd_magic %08x != %08x\n", result,
308 result->obd_magic, OBD_DEVICE_MAGIC);
309 LASSERTF(result->obd_minor == new_obd_minor,
310 "%p obd_minor %d != %d\n", result,
311 result->obd_minor, new_obd_minor);
313 obd_devs[result->obd_minor] = NULL;
314 result->obd_name[0]='\0';
316 result = ERR_PTR(-EEXIST);
319 if (!result && !obd) {
321 result->obd_minor = i;
323 result->obd_type = type;
324 strncpy(result->obd_name, name,
325 sizeof(result->obd_name) - 1);
326 obd_devs[i] = result;
329 cfs_spin_unlock(&obd_dev_lock);
331 if (result == NULL && i >= class_devno_max()) {
332 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
334 result = ERR_PTR(-EOVERFLOW);
337 if (IS_ERR(result)) {
338 obd_device_free(newdev);
339 class_put_type(type);
341 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
342 result->obd_name, result);
347 void class_release_dev(struct obd_device *obd)
349 struct obd_type *obd_type = obd->obd_type;
351 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
352 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
353 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
354 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
355 LASSERT(obd_type != NULL);
357 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
358 obd->obd_name,obd->obd_type->typ_name);
360 cfs_spin_lock(&obd_dev_lock);
361 obd_devs[obd->obd_minor] = NULL;
362 cfs_spin_unlock(&obd_dev_lock);
363 obd_device_free(obd);
365 class_put_type(obd_type);
368 int class_name2dev(const char *name)
375 cfs_spin_lock(&obd_dev_lock);
376 for (i = 0; i < class_devno_max(); i++) {
377 struct obd_device *obd = class_num2obd(i);
378 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
379 /* Make sure we finished attaching before we give
380 out any references */
381 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
382 if (obd->obd_attached) {
383 cfs_spin_unlock(&obd_dev_lock);
389 cfs_spin_unlock(&obd_dev_lock);
394 struct obd_device *class_name2obd(const char *name)
396 int dev = class_name2dev(name);
398 if (dev < 0 || dev > class_devno_max())
400 return class_num2obd(dev);
403 int class_uuid2dev(struct obd_uuid *uuid)
407 cfs_spin_lock(&obd_dev_lock);
408 for (i = 0; i < class_devno_max(); i++) {
409 struct obd_device *obd = class_num2obd(i);
410 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
411 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
412 cfs_spin_unlock(&obd_dev_lock);
416 cfs_spin_unlock(&obd_dev_lock);
421 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
423 int dev = class_uuid2dev(uuid);
426 return class_num2obd(dev);
430 * Get obd device from ::obd_devs[]
432 * \param num [in] array index
434 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
435 * otherwise return the obd device there.
437 struct obd_device *class_num2obd(int num)
439 struct obd_device *obd = NULL;
441 if (num < class_devno_max()) {
446 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
447 "%p obd_magic %08x != %08x\n",
448 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
449 LASSERTF(obd->obd_minor == num,
450 "%p obd_minor %0d != %0d\n",
451 obd, obd->obd_minor, num);
457 void class_obd_list(void)
462 cfs_spin_lock(&obd_dev_lock);
463 for (i = 0; i < class_devno_max(); i++) {
464 struct obd_device *obd = class_num2obd(i);
467 if (obd->obd_stopping)
469 else if (obd->obd_set_up)
471 else if (obd->obd_attached)
475 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
476 i, status, obd->obd_type->typ_name,
477 obd->obd_name, obd->obd_uuid.uuid,
478 cfs_atomic_read(&obd->obd_refcount));
480 cfs_spin_unlock(&obd_dev_lock);
484 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
485 specified, then only the client with that uuid is returned,
486 otherwise any client connected to the tgt is returned. */
487 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
488 const char * typ_name,
489 struct obd_uuid *grp_uuid)
493 cfs_spin_lock(&obd_dev_lock);
494 for (i = 0; i < class_devno_max(); i++) {
495 struct obd_device *obd = class_num2obd(i);
498 if ((strncmp(obd->obd_type->typ_name, typ_name,
499 strlen(typ_name)) == 0)) {
500 if (obd_uuid_equals(tgt_uuid,
501 &obd->u.cli.cl_target_uuid) &&
502 ((grp_uuid)? obd_uuid_equals(grp_uuid,
503 &obd->obd_uuid) : 1)) {
504 cfs_spin_unlock(&obd_dev_lock);
509 cfs_spin_unlock(&obd_dev_lock);
514 /* Iterate the obd_device list looking devices have grp_uuid. Start
515 searching at *next, and if a device is found, the next index to look
516 at is saved in *next. If next is NULL, then the first matching device
517 will always be returned. */
518 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
524 else if (*next >= 0 && *next < class_devno_max())
529 cfs_spin_lock(&obd_dev_lock);
530 for (; i < class_devno_max(); i++) {
531 struct obd_device *obd = class_num2obd(i);
534 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
537 cfs_spin_unlock(&obd_dev_lock);
541 cfs_spin_unlock(&obd_dev_lock);
547 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
548 * adjust sptlrpc settings accordingly.
550 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
552 struct obd_device *obd;
556 LASSERT(namelen > 0);
558 cfs_spin_lock(&obd_dev_lock);
559 for (i = 0; i < class_devno_max(); i++) {
560 obd = class_num2obd(i);
562 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
565 /* only notify mdc, osc, mdt, ost */
566 type = obd->obd_type->typ_name;
567 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
568 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
569 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
570 strcmp(type, LUSTRE_OST_NAME) != 0)
573 if (strncmp(obd->obd_name, fsname, namelen))
576 class_incref(obd, __FUNCTION__, obd);
577 cfs_spin_unlock(&obd_dev_lock);
578 rc2 = obd_set_info_async(obd->obd_self_export,
579 sizeof(KEY_SPTLRPC_CONF),
580 KEY_SPTLRPC_CONF, 0, NULL, NULL);
582 class_decref(obd, __FUNCTION__, obd);
583 cfs_spin_lock(&obd_dev_lock);
585 cfs_spin_unlock(&obd_dev_lock);
588 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
590 void obd_cleanup_caches(void)
595 if (obd_device_cachep) {
596 rc = cfs_mem_cache_destroy(obd_device_cachep);
597 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
598 obd_device_cachep = NULL;
601 rc = cfs_mem_cache_destroy(obdo_cachep);
602 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
606 rc = cfs_mem_cache_destroy(import_cachep);
607 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
608 import_cachep = NULL;
611 rc = cfs_mem_cache_destroy(capa_cachep);
612 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
618 int obd_init_caches(void)
622 LASSERT(obd_device_cachep == NULL);
623 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
624 sizeof(struct obd_device),
626 if (!obd_device_cachep)
629 LASSERT(obdo_cachep == NULL);
630 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
635 LASSERT(import_cachep == NULL);
636 import_cachep = cfs_mem_cache_create("ll_import_cache",
637 sizeof(struct obd_import),
642 LASSERT(capa_cachep == NULL);
643 capa_cachep = cfs_mem_cache_create("capa_cache",
644 sizeof(struct obd_capa), 0, 0);
650 obd_cleanup_caches();
655 /* map connection to client */
656 struct obd_export *class_conn2export(struct lustre_handle *conn)
658 struct obd_export *export;
662 CDEBUG(D_CACHE, "looking for null handle\n");
666 if (conn->cookie == -1) { /* this means assign a new connection */
667 CDEBUG(D_CACHE, "want a new connection\n");
671 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
672 export = class_handle2object(conn->cookie);
676 struct obd_device *class_exp2obd(struct obd_export *exp)
683 struct obd_device *class_conn2obd(struct lustre_handle *conn)
685 struct obd_export *export;
686 export = class_conn2export(conn);
688 struct obd_device *obd = export->exp_obd;
689 class_export_put(export);
695 struct obd_import *class_exp2cliimp(struct obd_export *exp)
697 struct obd_device *obd = exp->exp_obd;
700 return obd->u.cli.cl_import;
703 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
705 struct obd_device *obd = class_conn2obd(conn);
708 return obd->u.cli.cl_import;
711 /* Export management functions */
712 static void class_export_destroy(struct obd_export *exp)
714 struct obd_device *obd = exp->exp_obd;
717 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
719 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
720 exp->exp_client_uuid.uuid, obd->obd_name);
722 LASSERT(obd != NULL);
724 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
725 if (exp->exp_connection)
726 ptlrpc_put_connection_superhack(exp->exp_connection);
728 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
729 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
730 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
731 LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
732 obd_destroy_export(exp);
733 class_decref(obd, "export", exp);
735 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
739 static void export_handle_addref(void *export)
741 class_export_get(export);
744 struct obd_export *class_export_get(struct obd_export *exp)
746 cfs_atomic_inc(&exp->exp_refcount);
747 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
748 cfs_atomic_read(&exp->exp_refcount));
751 EXPORT_SYMBOL(class_export_get);
753 void class_export_put(struct obd_export *exp)
755 LASSERT(exp != NULL);
756 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, 0x5a5a5a);
757 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
758 cfs_atomic_read(&exp->exp_refcount) - 1);
760 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
761 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
762 CDEBUG(D_IOCTL, "final put %p/%s\n",
763 exp, exp->exp_client_uuid.uuid);
765 /* release nid stat refererence */
766 lprocfs_exp_cleanup(exp);
768 obd_zombie_export_add(exp);
771 EXPORT_SYMBOL(class_export_put);
773 /* Creates a new export, adds it to the hash table, and returns a
774 * pointer to it. The refcount is 2: one for the hash reference, and
775 * one for the pointer returned by this function. */
776 struct obd_export *class_new_export(struct obd_device *obd,
777 struct obd_uuid *cluuid)
779 struct obd_export *export;
780 cfs_hash_t *hash = NULL;
784 OBD_ALLOC_PTR(export);
786 return ERR_PTR(-ENOMEM);
788 export->exp_conn_cnt = 0;
789 export->exp_lock_hash = NULL;
790 cfs_atomic_set(&export->exp_refcount, 2);
791 cfs_atomic_set(&export->exp_rpc_count, 0);
792 cfs_atomic_set(&export->exp_cb_count, 0);
793 cfs_atomic_set(&export->exp_locks_count, 0);
794 #if LUSTRE_TRACKS_LOCK_EXP_REFS
795 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
796 cfs_spin_lock_init(&export->exp_locks_list_guard);
798 cfs_atomic_set(&export->exp_replay_count, 0);
799 export->exp_obd = obd;
800 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
801 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
802 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
803 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
804 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
805 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
806 class_handle_hash(&export->exp_handle, export_handle_addref);
807 export->exp_last_request_time = cfs_time_current_sec();
808 cfs_spin_lock_init(&export->exp_lock);
809 cfs_spin_lock_init(&export->exp_rpc_lock);
810 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
811 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
813 export->exp_sp_peer = LUSTRE_SP_ANY;
814 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
815 export->exp_client_uuid = *cluuid;
816 obd_init_export(export);
818 cfs_spin_lock(&obd->obd_dev_lock);
819 /* shouldn't happen, but might race */
820 if (obd->obd_stopping)
821 GOTO(exit_unlock, rc = -ENODEV);
823 hash = cfs_hash_getref(obd->obd_uuid_hash);
825 GOTO(exit_unlock, rc = -ENODEV);
826 cfs_spin_unlock(&obd->obd_dev_lock);
828 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
829 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
831 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
832 obd->obd_name, cluuid->uuid, rc);
833 GOTO(exit_err, rc = -EALREADY);
837 cfs_spin_lock(&obd->obd_dev_lock);
838 if (obd->obd_stopping) {
839 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
840 GOTO(exit_unlock, rc = -ENODEV);
843 class_incref(obd, "export", export);
844 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
845 cfs_list_add_tail(&export->exp_obd_chain_timed,
846 &export->exp_obd->obd_exports_timed);
847 export->exp_obd->obd_num_exports++;
848 cfs_spin_unlock(&obd->obd_dev_lock);
849 cfs_hash_putref(hash);
853 cfs_spin_unlock(&obd->obd_dev_lock);
856 cfs_hash_putref(hash);
857 class_handle_unhash(&export->exp_handle);
858 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
859 obd_destroy_export(export);
860 OBD_FREE_PTR(export);
863 EXPORT_SYMBOL(class_new_export);
865 void class_unlink_export(struct obd_export *exp)
867 class_handle_unhash(&exp->exp_handle);
869 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
870 /* delete an uuid-export hashitem from hashtables */
871 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
872 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
873 &exp->exp_client_uuid,
874 &exp->exp_uuid_hash);
876 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
877 cfs_list_del_init(&exp->exp_obd_chain_timed);
878 exp->exp_obd->obd_num_exports--;
879 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
880 class_export_put(exp);
882 EXPORT_SYMBOL(class_unlink_export);
884 /* Import management functions */
885 void class_import_destroy(struct obd_import *imp)
889 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
890 imp->imp_obd->obd_name);
892 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
894 ptlrpc_put_connection_superhack(imp->imp_connection);
896 while (!cfs_list_empty(&imp->imp_conn_list)) {
897 struct obd_import_conn *imp_conn;
899 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
900 struct obd_import_conn, oic_item);
901 cfs_list_del_init(&imp_conn->oic_item);
902 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
903 OBD_FREE(imp_conn, sizeof(*imp_conn));
906 LASSERT(imp->imp_sec == NULL);
907 class_decref(imp->imp_obd, "import", imp);
908 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
912 static void import_handle_addref(void *import)
914 class_import_get(import);
917 struct obd_import *class_import_get(struct obd_import *import)
919 cfs_atomic_inc(&import->imp_refcount);
920 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
921 cfs_atomic_read(&import->imp_refcount),
922 import->imp_obd->obd_name);
925 EXPORT_SYMBOL(class_import_get);
927 void class_import_put(struct obd_import *imp)
931 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
932 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, 0x5a5a5a);
934 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
935 cfs_atomic_read(&imp->imp_refcount) - 1,
936 imp->imp_obd->obd_name);
938 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
939 CDEBUG(D_INFO, "final put import %p\n", imp);
940 obd_zombie_import_add(imp);
945 EXPORT_SYMBOL(class_import_put);
947 static void init_imp_at(struct imp_at *at) {
949 at_init(&at->iat_net_latency, 0, 0);
950 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
951 /* max service estimates are tracked on the server side, so
952 don't use the AT history here, just use the last reported
953 val. (But keep hist for proc histogram, worst_ever) */
954 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
959 struct obd_import *class_new_import(struct obd_device *obd)
961 struct obd_import *imp;
963 OBD_ALLOC(imp, sizeof(*imp));
967 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
968 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
969 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
970 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
971 cfs_spin_lock_init(&imp->imp_lock);
972 imp->imp_last_success_conn = 0;
973 imp->imp_state = LUSTRE_IMP_NEW;
974 imp->imp_obd = class_incref(obd, "import", imp);
975 cfs_sema_init(&imp->imp_sec_mutex, 1);
976 cfs_waitq_init(&imp->imp_recovery_waitq);
978 cfs_atomic_set(&imp->imp_refcount, 2);
979 cfs_atomic_set(&imp->imp_unregistering, 0);
980 cfs_atomic_set(&imp->imp_inflight, 0);
981 cfs_atomic_set(&imp->imp_replay_inflight, 0);
982 cfs_atomic_set(&imp->imp_inval_count, 0);
983 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
984 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
985 class_handle_hash(&imp->imp_handle, import_handle_addref);
986 init_imp_at(&imp->imp_at);
988 /* the default magic is V2, will be used in connect RPC, and
989 * then adjusted according to the flags in request/reply. */
990 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
994 EXPORT_SYMBOL(class_new_import);
996 void class_destroy_import(struct obd_import *import)
998 LASSERT(import != NULL);
999 LASSERT(import != LP_POISON);
1001 class_handle_unhash(&import->imp_handle);
1003 cfs_spin_lock(&import->imp_lock);
1004 import->imp_generation++;
1005 cfs_spin_unlock(&import->imp_lock);
1006 class_import_put(import);
1008 EXPORT_SYMBOL(class_destroy_import);
1010 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1012 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1014 cfs_spin_lock(&exp->exp_locks_list_guard);
1016 LASSERT(lock->l_exp_refs_nr >= 0);
1018 if (lock->l_exp_refs_target != NULL &&
1019 lock->l_exp_refs_target != exp) {
1020 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1021 exp, lock, lock->l_exp_refs_target);
1023 if ((lock->l_exp_refs_nr ++) == 0) {
1024 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1025 lock->l_exp_refs_target = exp;
1027 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1028 lock, exp, lock->l_exp_refs_nr);
1029 cfs_spin_unlock(&exp->exp_locks_list_guard);
1031 EXPORT_SYMBOL(__class_export_add_lock_ref);
1033 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1035 cfs_spin_lock(&exp->exp_locks_list_guard);
1036 LASSERT(lock->l_exp_refs_nr > 0);
1037 if (lock->l_exp_refs_target != exp) {
1038 LCONSOLE_WARN("lock %p, "
1039 "mismatching export pointers: %p, %p\n",
1040 lock, lock->l_exp_refs_target, exp);
1042 if (-- lock->l_exp_refs_nr == 0) {
1043 cfs_list_del_init(&lock->l_exp_refs_link);
1044 lock->l_exp_refs_target = NULL;
1046 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1047 lock, exp, lock->l_exp_refs_nr);
1048 cfs_spin_unlock(&exp->exp_locks_list_guard);
1050 EXPORT_SYMBOL(__class_export_del_lock_ref);
1053 /* A connection defines an export context in which preallocation can
1054 be managed. This releases the export pointer reference, and returns
1055 the export handle, so the export refcount is 1 when this function
1057 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1058 struct obd_uuid *cluuid)
1060 struct obd_export *export;
1061 LASSERT(conn != NULL);
1062 LASSERT(obd != NULL);
1063 LASSERT(cluuid != NULL);
1066 export = class_new_export(obd, cluuid);
1068 RETURN(PTR_ERR(export));
1070 conn->cookie = export->exp_handle.h_cookie;
1071 class_export_put(export);
1073 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1074 cluuid->uuid, conn->cookie);
1077 EXPORT_SYMBOL(class_connect);
1079 /* if export is involved in recovery then clean up related things */
1080 void class_export_recovery_cleanup(struct obd_export *exp)
1082 struct obd_device *obd = exp->exp_obd;
1084 cfs_spin_lock(&obd->obd_recovery_task_lock);
1085 if (exp->exp_delayed)
1086 obd->obd_delayed_clients--;
1087 if (obd->obd_recovering && exp->exp_in_recovery) {
1088 cfs_spin_lock(&exp->exp_lock);
1089 exp->exp_in_recovery = 0;
1090 cfs_spin_unlock(&exp->exp_lock);
1091 LASSERT(obd->obd_connected_clients);
1092 obd->obd_connected_clients--;
1094 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1095 /** Cleanup req replay fields */
1096 if (exp->exp_req_replay_needed) {
1097 cfs_spin_lock(&exp->exp_lock);
1098 exp->exp_req_replay_needed = 0;
1099 cfs_spin_unlock(&exp->exp_lock);
1100 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1101 cfs_atomic_dec(&obd->obd_req_replay_clients);
1103 /** Cleanup lock replay data */
1104 if (exp->exp_lock_replay_needed) {
1105 cfs_spin_lock(&exp->exp_lock);
1106 exp->exp_lock_replay_needed = 0;
1107 cfs_spin_unlock(&exp->exp_lock);
1108 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1109 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1113 /* This function removes 1-3 references from the export:
1114 * 1 - for export pointer passed
1115 * and if disconnect really need
1116 * 2 - removing from hash
1117 * 3 - in client_unlink_export
1118 * The export pointer passed to this function can destroyed */
1119 int class_disconnect(struct obd_export *export)
1121 int already_disconnected;
1124 if (export == NULL) {
1126 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1130 cfs_spin_lock(&export->exp_lock);
1131 already_disconnected = export->exp_disconnected;
1132 export->exp_disconnected = 1;
1133 cfs_spin_unlock(&export->exp_lock);
1135 /* class_cleanup(), abort_recovery(), and class_fail_export()
1136 * all end up in here, and if any of them race we shouldn't
1137 * call extra class_export_puts(). */
1138 if (already_disconnected) {
1139 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1140 GOTO(no_disconn, already_disconnected);
1143 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1144 export->exp_handle.h_cookie);
1146 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1147 cfs_hash_del(export->exp_obd->obd_nid_hash,
1148 &export->exp_connection->c_peer.nid,
1149 &export->exp_nid_hash);
1151 class_export_recovery_cleanup(export);
1152 class_unlink_export(export);
1154 class_export_put(export);
1158 /* Return non-zero for a fully connected export */
1159 int class_connected_export(struct obd_export *exp)
1163 cfs_spin_lock(&exp->exp_lock);
1164 connected = (exp->exp_conn_cnt > 0);
1165 cfs_spin_unlock(&exp->exp_lock);
1170 EXPORT_SYMBOL(class_connected_export);
1172 static void class_disconnect_export_list(cfs_list_t *list,
1173 enum obd_option flags)
1176 struct obd_export *exp;
1179 /* It's possible that an export may disconnect itself, but
1180 * nothing else will be added to this list. */
1181 while (!cfs_list_empty(list)) {
1182 exp = cfs_list_entry(list->next, struct obd_export,
1184 /* need for safe call CDEBUG after obd_disconnect */
1185 class_export_get(exp);
1187 cfs_spin_lock(&exp->exp_lock);
1188 exp->exp_flags = flags;
1189 cfs_spin_unlock(&exp->exp_lock);
1191 if (obd_uuid_equals(&exp->exp_client_uuid,
1192 &exp->exp_obd->obd_uuid)) {
1194 "exp %p export uuid == obd uuid, don't discon\n",
1196 /* Need to delete this now so we don't end up pointing
1197 * to work_list later when this export is cleaned up. */
1198 cfs_list_del_init(&exp->exp_obd_chain);
1199 class_export_put(exp);
1203 class_export_get(exp);
1204 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1205 "last request at "CFS_TIME_T"\n",
1206 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1207 exp, exp->exp_last_request_time);
1208 /* release one export reference anyway */
1209 rc = obd_disconnect(exp);
1211 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1212 obd_export_nid2str(exp), exp, rc);
1213 class_export_put(exp);
1218 void class_disconnect_exports(struct obd_device *obd)
1220 cfs_list_t work_list;
1223 /* Move all of the exports from obd_exports to a work list, en masse. */
1224 CFS_INIT_LIST_HEAD(&work_list);
1225 cfs_spin_lock(&obd->obd_dev_lock);
1226 cfs_list_splice_init(&obd->obd_exports, &work_list);
1227 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1228 cfs_spin_unlock(&obd->obd_dev_lock);
1230 if (!cfs_list_empty(&work_list)) {
1231 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1232 "disconnecting them\n", obd->obd_minor, obd);
1233 class_disconnect_export_list(&work_list,
1234 exp_flags_from_obd(obd));
1236 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1237 obd->obd_minor, obd);
1240 EXPORT_SYMBOL(class_disconnect_exports);
1242 /* Remove exports that have not completed recovery.
1244 void class_disconnect_stale_exports(struct obd_device *obd,
1245 int (*test_export)(struct obd_export *))
1247 cfs_list_t work_list;
1248 cfs_list_t *pos, *n;
1249 struct obd_export *exp;
1253 CFS_INIT_LIST_HEAD(&work_list);
1254 cfs_spin_lock(&obd->obd_dev_lock);
1255 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1256 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1257 if (test_export(exp))
1260 /* don't count self-export as client */
1261 if (obd_uuid_equals(&exp->exp_client_uuid,
1262 &exp->exp_obd->obd_uuid))
1265 cfs_list_move(&exp->exp_obd_chain, &work_list);
1267 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1268 obd->obd_name, exp->exp_client_uuid.uuid,
1269 exp->exp_connection == NULL ? "<unknown>" :
1270 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1271 print_export_data(exp, "EVICTING", 0);
1273 cfs_spin_unlock(&obd->obd_dev_lock);
1276 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1277 obd->obd_name, evicted);
1278 obd->obd_stale_clients += evicted;
1280 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1281 OBD_OPT_ABORT_RECOV);
1284 EXPORT_SYMBOL(class_disconnect_stale_exports);
1286 void class_fail_export(struct obd_export *exp)
1288 int rc, already_failed;
1290 cfs_spin_lock(&exp->exp_lock);
1291 already_failed = exp->exp_failed;
1292 exp->exp_failed = 1;
1293 cfs_spin_unlock(&exp->exp_lock);
1295 if (already_failed) {
1296 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1297 exp, exp->exp_client_uuid.uuid);
1301 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1302 exp, exp->exp_client_uuid.uuid);
1304 if (obd_dump_on_timeout)
1305 libcfs_debug_dumplog();
1307 /* Most callers into obd_disconnect are removing their own reference
1308 * (request, for example) in addition to the one from the hash table.
1309 * We don't have such a reference here, so make one. */
1310 class_export_get(exp);
1311 rc = obd_disconnect(exp);
1313 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1315 CDEBUG(D_HA, "disconnected export %p/%s\n",
1316 exp, exp->exp_client_uuid.uuid);
1318 EXPORT_SYMBOL(class_fail_export);
1320 char *obd_export_nid2str(struct obd_export *exp)
1322 if (exp->exp_connection != NULL)
1323 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1327 EXPORT_SYMBOL(obd_export_nid2str);
1329 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1331 struct obd_export *doomed_exp = NULL;
1332 int exports_evicted = 0;
1334 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1337 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1338 if (doomed_exp == NULL)
1341 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1342 "nid %s found, wanted nid %s, requested nid %s\n",
1343 obd_export_nid2str(doomed_exp),
1344 libcfs_nid2str(nid_key), nid);
1345 LASSERTF(doomed_exp != obd->obd_self_export,
1346 "self-export is hashed by NID?\n");
1348 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1349 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1351 class_fail_export(doomed_exp);
1352 class_export_put(doomed_exp);
1355 if (!exports_evicted)
1356 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1357 obd->obd_name, nid);
1358 return exports_evicted;
1360 EXPORT_SYMBOL(obd_export_evict_by_nid);
1362 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1364 struct obd_export *doomed_exp = NULL;
1365 struct obd_uuid doomed_uuid;
1366 int exports_evicted = 0;
1368 obd_str2uuid(&doomed_uuid, uuid);
1369 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1370 CERROR("%s: can't evict myself\n", obd->obd_name);
1371 return exports_evicted;
1374 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1376 if (doomed_exp == NULL) {
1377 CERROR("%s: can't disconnect %s: no exports found\n",
1378 obd->obd_name, uuid);
1380 CWARN("%s: evicting %s at adminstrative request\n",
1381 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1382 class_fail_export(doomed_exp);
1383 class_export_put(doomed_exp);
1387 return exports_evicted;
1389 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1391 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1392 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1393 EXPORT_SYMBOL(class_export_dump_hook);
1396 static void print_export_data(struct obd_export *exp, const char *status,
1399 struct ptlrpc_reply_state *rs;
1400 struct ptlrpc_reply_state *first_reply = NULL;
1403 cfs_spin_lock(&exp->exp_lock);
1404 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1410 cfs_spin_unlock(&exp->exp_lock);
1412 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1413 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1414 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1415 cfs_atomic_read(&exp->exp_rpc_count),
1416 cfs_atomic_read(&exp->exp_cb_count),
1417 cfs_atomic_read(&exp->exp_locks_count),
1418 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1419 nreplies, first_reply, nreplies > 3 ? "..." : "",
1420 exp->exp_last_committed);
1421 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1422 if (locks && class_export_dump_hook != NULL)
1423 class_export_dump_hook(exp);
1427 void dump_exports(struct obd_device *obd, int locks)
1429 struct obd_export *exp;
1431 cfs_spin_lock(&obd->obd_dev_lock);
1432 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1433 print_export_data(exp, "ACTIVE", locks);
1434 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1435 print_export_data(exp, "UNLINKED", locks);
1436 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1437 print_export_data(exp, "DELAYED", locks);
1438 cfs_spin_unlock(&obd->obd_dev_lock);
1439 cfs_spin_lock(&obd_zombie_impexp_lock);
1440 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1441 print_export_data(exp, "ZOMBIE", locks);
1442 cfs_spin_unlock(&obd_zombie_impexp_lock);
1444 EXPORT_SYMBOL(dump_exports);
1446 void obd_exports_barrier(struct obd_device *obd)
1449 LASSERT(cfs_list_empty(&obd->obd_exports));
1450 cfs_spin_lock(&obd->obd_dev_lock);
1451 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1452 cfs_spin_unlock(&obd->obd_dev_lock);
1453 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1454 cfs_time_seconds(waited));
1455 if (waited > 5 && IS_PO2(waited)) {
1456 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1457 "more than %d seconds. "
1458 "The obd refcount = %d. Is it stuck?\n",
1459 obd->obd_name, waited,
1460 cfs_atomic_read(&obd->obd_refcount));
1461 dump_exports(obd, 1);
1464 cfs_spin_lock(&obd->obd_dev_lock);
1466 cfs_spin_unlock(&obd->obd_dev_lock);
1468 EXPORT_SYMBOL(obd_exports_barrier);
1470 /* Total amount of zombies to be destroyed */
1471 static int zombies_count = 0;
1474 * kill zombie imports and exports
1476 void obd_zombie_impexp_cull(void)
1478 struct obd_import *import;
1479 struct obd_export *export;
1483 cfs_spin_lock(&obd_zombie_impexp_lock);
1486 if (!cfs_list_empty(&obd_zombie_imports)) {
1487 import = cfs_list_entry(obd_zombie_imports.next,
1490 cfs_list_del_init(&import->imp_zombie_chain);
1494 if (!cfs_list_empty(&obd_zombie_exports)) {
1495 export = cfs_list_entry(obd_zombie_exports.next,
1498 cfs_list_del_init(&export->exp_obd_chain);
1501 cfs_spin_unlock(&obd_zombie_impexp_lock);
1503 if (import != NULL) {
1504 class_import_destroy(import);
1505 cfs_spin_lock(&obd_zombie_impexp_lock);
1507 cfs_spin_unlock(&obd_zombie_impexp_lock);
1510 if (export != NULL) {
1511 class_export_destroy(export);
1512 cfs_spin_lock(&obd_zombie_impexp_lock);
1514 cfs_spin_unlock(&obd_zombie_impexp_lock);
1518 } while (import != NULL || export != NULL);
1522 static cfs_completion_t obd_zombie_start;
1523 static cfs_completion_t obd_zombie_stop;
1524 static unsigned long obd_zombie_flags;
1525 static cfs_waitq_t obd_zombie_waitq;
1526 static pid_t obd_zombie_pid;
1529 OBD_ZOMBIE_STOP = 1 << 1
1533 * check for work for kill zombie import/export thread.
1535 static int obd_zombie_impexp_check(void *arg)
1539 cfs_spin_lock(&obd_zombie_impexp_lock);
1540 rc = (zombies_count == 0) &&
1541 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1542 cfs_spin_unlock(&obd_zombie_impexp_lock);
1548 * Add export to the obd_zombe thread and notify it.
1550 static void obd_zombie_export_add(struct obd_export *exp) {
1551 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1552 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1553 cfs_list_del_init(&exp->exp_obd_chain);
1554 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1555 cfs_spin_lock(&obd_zombie_impexp_lock);
1557 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1558 cfs_spin_unlock(&obd_zombie_impexp_lock);
1560 if (obd_zombie_impexp_notify != NULL)
1561 obd_zombie_impexp_notify();
1565 * Add import to the obd_zombe thread and notify it.
1567 static void obd_zombie_import_add(struct obd_import *imp) {
1568 LASSERT(imp->imp_sec == NULL);
1569 cfs_spin_lock(&obd_zombie_impexp_lock);
1570 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1572 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1573 cfs_spin_unlock(&obd_zombie_impexp_lock);
1575 if (obd_zombie_impexp_notify != NULL)
1576 obd_zombie_impexp_notify();
1580 * notify import/export destroy thread about new zombie.
1582 static void obd_zombie_impexp_notify(void)
1585 * Make sure obd_zomebie_impexp_thread get this notification.
1586 * It is possible this signal only get by obd_zombie_barrier, and
1587 * barrier gulps this notification and sleeps away and hangs ensues
1589 cfs_waitq_broadcast(&obd_zombie_waitq);
1593 * check whether obd_zombie is idle
1595 static int obd_zombie_is_idle(void)
1599 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1600 cfs_spin_lock(&obd_zombie_impexp_lock);
1601 rc = (zombies_count == 0);
1602 cfs_spin_unlock(&obd_zombie_impexp_lock);
1607 * wait when obd_zombie import/export queues become empty
1609 void obd_zombie_barrier(void)
1611 struct l_wait_info lwi = { 0 };
1613 if (obd_zombie_pid == cfs_curproc_pid())
1614 /* don't wait for myself */
1616 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1618 EXPORT_SYMBOL(obd_zombie_barrier);
1623 * destroy zombie export/import thread.
1625 static int obd_zombie_impexp_thread(void *unused)
1629 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1630 cfs_complete(&obd_zombie_start);
1634 cfs_complete(&obd_zombie_start);
1636 obd_zombie_pid = cfs_curproc_pid();
1638 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1639 struct l_wait_info lwi = { 0 };
1641 l_wait_event(obd_zombie_waitq,
1642 !obd_zombie_impexp_check(NULL), &lwi);
1643 obd_zombie_impexp_cull();
1646 * Notify obd_zombie_barrier callers that queues
1649 cfs_waitq_signal(&obd_zombie_waitq);
1652 cfs_complete(&obd_zombie_stop);
1657 #else /* ! KERNEL */
1659 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1660 static void *obd_zombie_impexp_work_cb;
1661 static void *obd_zombie_impexp_idle_cb;
1663 int obd_zombie_impexp_kill(void *arg)
1667 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1668 obd_zombie_impexp_cull();
1671 cfs_atomic_dec(&zombie_recur);
1678 * start destroy zombie import/export thread
1680 int obd_zombie_impexp_init(void)
1684 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1685 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1686 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1687 cfs_init_completion(&obd_zombie_start);
1688 cfs_init_completion(&obd_zombie_stop);
1689 cfs_waitq_init(&obd_zombie_waitq);
1693 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1697 cfs_wait_for_completion(&obd_zombie_start);
1700 obd_zombie_impexp_work_cb =
1701 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1702 &obd_zombie_impexp_kill, NULL);
1704 obd_zombie_impexp_idle_cb =
1705 liblustre_register_idle_callback("obd_zombi_impexp_check",
1706 &obd_zombie_impexp_check, NULL);
1712 * stop destroy zombie import/export thread
1714 void obd_zombie_impexp_stop(void)
1716 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1717 obd_zombie_impexp_notify();
1719 cfs_wait_for_completion(&obd_zombie_stop);
1721 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1722 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1726 /***** Kernel-userspace comm helpers *******/
1728 /* Get length of entire message, including header */
1729 int kuc_len(int payload_len)
1731 return sizeof(struct kuc_hdr) + payload_len;
1733 EXPORT_SYMBOL(kuc_len);
1735 /* Get a pointer to kuc header, given a ptr to the payload
1736 * @param p Pointer to payload area
1737 * @returns Pointer to kuc header
1739 struct kuc_hdr * kuc_ptr(void *p)
1741 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1742 LASSERT(lh->kuc_magic == KUC_MAGIC);
1745 EXPORT_SYMBOL(kuc_ptr);
1747 /* Test if payload is part of kuc message
1748 * @param p Pointer to payload area
1751 int kuc_ispayload(void *p)
1753 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1755 if (kh->kuc_magic == KUC_MAGIC)
1760 EXPORT_SYMBOL(kuc_ispayload);
1762 /* Alloc space for a message, and fill in header
1763 * @return Pointer to payload area
1765 void *kuc_alloc(int payload_len, int transport, int type)
1768 int len = kuc_len(payload_len);
1772 return ERR_PTR(-ENOMEM);
1774 lh->kuc_magic = KUC_MAGIC;
1775 lh->kuc_transport = transport;
1776 lh->kuc_msgtype = type;
1777 lh->kuc_msglen = len;
1779 return (void *)(lh + 1);
1781 EXPORT_SYMBOL(kuc_alloc);
1783 /* Takes pointer to payload area */
1784 inline void kuc_free(void *p, int payload_len)
1786 struct kuc_hdr *lh = kuc_ptr(p);
1787 OBD_FREE(lh, kuc_len(payload_len));
1789 EXPORT_SYMBOL(kuc_free);