4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!cfs_request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 cfs_try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
151 EXPORT_SYMBOL(class_get_type);
153 void class_put_type(struct obd_type *type)
156 spin_lock(&type->obd_type_lock);
158 cfs_module_put(type->typ_dt_ops->o_owner);
159 spin_unlock(&type->obd_type_lock);
161 EXPORT_SYMBOL(class_put_type);
163 #define CLASS_MAX_NAME 1024
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166 struct lprocfs_vars *vars, const char *name,
167 struct lu_device_type *ldt)
169 struct obd_type *type;
174 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
176 if (class_search_type(name)) {
177 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
182 OBD_ALLOC(type, sizeof(*type));
186 OBD_ALLOC_PTR(type->typ_dt_ops);
187 OBD_ALLOC_PTR(type->typ_md_ops);
188 OBD_ALLOC(type->typ_name, strlen(name) + 1);
190 if (type->typ_dt_ops == NULL ||
191 type->typ_md_ops == NULL ||
192 type->typ_name == NULL)
195 *(type->typ_dt_ops) = *dt_ops;
196 /* md_ops is optional */
198 *(type->typ_md_ops) = *md_ops;
199 strcpy(type->typ_name, name);
200 spin_lock_init(&type->obd_type_lock);
203 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
205 if (IS_ERR(type->typ_procroot)) {
206 rc = PTR_ERR(type->typ_procroot);
207 type->typ_procroot = NULL;
213 rc = lu_device_type_init(ldt);
218 spin_lock(&obd_types_lock);
219 cfs_list_add(&type->typ_chain, &obd_types);
220 spin_unlock(&obd_types_lock);
225 if (type->typ_name != NULL)
226 OBD_FREE(type->typ_name, strlen(name) + 1);
227 if (type->typ_md_ops != NULL)
228 OBD_FREE_PTR(type->typ_md_ops);
229 if (type->typ_dt_ops != NULL)
230 OBD_FREE_PTR(type->typ_dt_ops);
231 OBD_FREE(type, sizeof(*type));
234 EXPORT_SYMBOL(class_register_type);
236 int class_unregister_type(const char *name)
238 struct obd_type *type = class_search_type(name);
242 CERROR("unknown obd type\n");
246 if (type->typ_refcnt) {
247 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
248 /* This is a bad situation, let's make the best of it */
249 /* Remove ops, but leave the name for debugging */
250 OBD_FREE_PTR(type->typ_dt_ops);
251 OBD_FREE_PTR(type->typ_md_ops);
255 /* we do not use type->typ_procroot as for compatibility purposes
256 * other modules can share names (i.e. lod can use lov entry). so
257 * we can't reference pointer as it can get invalided when another
258 * module removes the entry */
259 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
262 lu_device_type_fini(type->typ_lu);
264 spin_lock(&obd_types_lock);
265 cfs_list_del(&type->typ_chain);
266 spin_unlock(&obd_types_lock);
267 OBD_FREE(type->typ_name, strlen(name) + 1);
268 if (type->typ_dt_ops != NULL)
269 OBD_FREE_PTR(type->typ_dt_ops);
270 if (type->typ_md_ops != NULL)
271 OBD_FREE_PTR(type->typ_md_ops);
272 OBD_FREE(type, sizeof(*type));
274 } /* class_unregister_type */
275 EXPORT_SYMBOL(class_unregister_type);
278 * Create a new obd device.
280 * Find an empty slot in ::obd_devs[], create a new obd device in it.
282 * \param[in] type_name obd device type string.
283 * \param[in] name obd device name.
285 * \retval NULL if create fails, otherwise return the obd device
288 struct obd_device *class_newdev(const char *type_name, const char *name)
290 struct obd_device *result = NULL;
291 struct obd_device *newdev;
292 struct obd_type *type = NULL;
294 int new_obd_minor = 0;
297 if (strlen(name) >= MAX_OBD_NAME) {
298 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
299 RETURN(ERR_PTR(-EINVAL));
302 type = class_get_type(type_name);
304 CERROR("OBD: unknown type: %s\n", type_name);
305 RETURN(ERR_PTR(-ENODEV));
308 newdev = obd_device_alloc();
310 GOTO(out_type, result = ERR_PTR(-ENOMEM));
312 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
314 write_lock(&obd_dev_lock);
315 for (i = 0; i < class_devno_max(); i++) {
316 struct obd_device *obd = class_num2obd(i);
318 if (obd && (strcmp(name, obd->obd_name) == 0)) {
319 CERROR("Device %s already exists at %d, won't add\n",
322 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
323 "%p obd_magic %08x != %08x\n", result,
324 result->obd_magic, OBD_DEVICE_MAGIC);
325 LASSERTF(result->obd_minor == new_obd_minor,
326 "%p obd_minor %d != %d\n", result,
327 result->obd_minor, new_obd_minor);
329 obd_devs[result->obd_minor] = NULL;
330 result->obd_name[0]='\0';
332 result = ERR_PTR(-EEXIST);
335 if (!result && !obd) {
337 result->obd_minor = i;
339 result->obd_type = type;
340 strncpy(result->obd_name, name,
341 sizeof(result->obd_name) - 1);
342 obd_devs[i] = result;
345 write_unlock(&obd_dev_lock);
347 if (result == NULL && i >= class_devno_max()) {
348 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
350 GOTO(out, result = ERR_PTR(-EOVERFLOW));
356 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
357 result->obd_name, result);
361 obd_device_free(newdev);
363 class_put_type(type);
367 void class_release_dev(struct obd_device *obd)
369 struct obd_type *obd_type = obd->obd_type;
371 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
372 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
373 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
374 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
375 LASSERT(obd_type != NULL);
377 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
378 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
380 write_lock(&obd_dev_lock);
381 obd_devs[obd->obd_minor] = NULL;
382 write_unlock(&obd_dev_lock);
383 obd_device_free(obd);
385 class_put_type(obd_type);
388 int class_name2dev(const char *name)
395 read_lock(&obd_dev_lock);
396 for (i = 0; i < class_devno_max(); i++) {
397 struct obd_device *obd = class_num2obd(i);
399 if (obd && strcmp(name, obd->obd_name) == 0) {
400 /* Make sure we finished attaching before we give
401 out any references */
402 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
403 if (obd->obd_attached) {
404 read_unlock(&obd_dev_lock);
410 read_unlock(&obd_dev_lock);
414 EXPORT_SYMBOL(class_name2dev);
416 struct obd_device *class_name2obd(const char *name)
418 int dev = class_name2dev(name);
420 if (dev < 0 || dev > class_devno_max())
422 return class_num2obd(dev);
424 EXPORT_SYMBOL(class_name2obd);
426 int class_uuid2dev(struct obd_uuid *uuid)
430 read_lock(&obd_dev_lock);
431 for (i = 0; i < class_devno_max(); i++) {
432 struct obd_device *obd = class_num2obd(i);
434 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
435 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
436 read_unlock(&obd_dev_lock);
440 read_unlock(&obd_dev_lock);
444 EXPORT_SYMBOL(class_uuid2dev);
446 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
448 int dev = class_uuid2dev(uuid);
451 return class_num2obd(dev);
453 EXPORT_SYMBOL(class_uuid2obd);
456 * Get obd device from ::obd_devs[]
458 * \param num [in] array index
460 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
461 * otherwise return the obd device there.
463 struct obd_device *class_num2obd(int num)
465 struct obd_device *obd = NULL;
467 if (num < class_devno_max()) {
472 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
473 "%p obd_magic %08x != %08x\n",
474 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
475 LASSERTF(obd->obd_minor == num,
476 "%p obd_minor %0d != %0d\n",
477 obd, obd->obd_minor, num);
482 EXPORT_SYMBOL(class_num2obd);
485 * Get obd devices count. Device in any
487 * \retval obd device count
489 int get_devices_count(void)
491 int index, max_index = class_devno_max(), dev_count = 0;
493 read_lock(&obd_dev_lock);
494 for (index = 0; index <= max_index; index++) {
495 struct obd_device *obd = class_num2obd(index);
499 read_unlock(&obd_dev_lock);
503 EXPORT_SYMBOL(get_devices_count);
505 void class_obd_list(void)
510 read_lock(&obd_dev_lock);
511 for (i = 0; i < class_devno_max(); i++) {
512 struct obd_device *obd = class_num2obd(i);
516 if (obd->obd_stopping)
518 else if (obd->obd_set_up)
520 else if (obd->obd_attached)
524 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
525 i, status, obd->obd_type->typ_name,
526 obd->obd_name, obd->obd_uuid.uuid,
527 cfs_atomic_read(&obd->obd_refcount));
529 read_unlock(&obd_dev_lock);
533 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
534 specified, then only the client with that uuid is returned,
535 otherwise any client connected to the tgt is returned. */
536 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
537 const char * typ_name,
538 struct obd_uuid *grp_uuid)
542 read_lock(&obd_dev_lock);
543 for (i = 0; i < class_devno_max(); i++) {
544 struct obd_device *obd = class_num2obd(i);
548 if ((strncmp(obd->obd_type->typ_name, typ_name,
549 strlen(typ_name)) == 0)) {
550 if (obd_uuid_equals(tgt_uuid,
551 &obd->u.cli.cl_target_uuid) &&
552 ((grp_uuid)? obd_uuid_equals(grp_uuid,
553 &obd->obd_uuid) : 1)) {
554 read_unlock(&obd_dev_lock);
559 read_unlock(&obd_dev_lock);
563 EXPORT_SYMBOL(class_find_client_obd);
565 /* Iterate the obd_device list looking devices have grp_uuid. Start
566 searching at *next, and if a device is found, the next index to look
567 at is saved in *next. If next is NULL, then the first matching device
568 will always be returned. */
569 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
575 else if (*next >= 0 && *next < class_devno_max())
580 read_lock(&obd_dev_lock);
581 for (; i < class_devno_max(); i++) {
582 struct obd_device *obd = class_num2obd(i);
586 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
589 read_unlock(&obd_dev_lock);
593 read_unlock(&obd_dev_lock);
597 EXPORT_SYMBOL(class_devices_in_group);
600 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
601 * adjust sptlrpc settings accordingly.
603 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
605 struct obd_device *obd;
609 LASSERT(namelen > 0);
611 read_lock(&obd_dev_lock);
612 for (i = 0; i < class_devno_max(); i++) {
613 obd = class_num2obd(i);
615 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
618 /* only notify mdc, osc, mdt, ost */
619 type = obd->obd_type->typ_name;
620 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
621 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
622 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
623 strcmp(type, LUSTRE_OST_NAME) != 0)
626 if (strncmp(obd->obd_name, fsname, namelen))
629 class_incref(obd, __FUNCTION__, obd);
630 read_unlock(&obd_dev_lock);
631 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
632 sizeof(KEY_SPTLRPC_CONF),
633 KEY_SPTLRPC_CONF, 0, NULL, NULL);
635 class_decref(obd, __FUNCTION__, obd);
636 read_lock(&obd_dev_lock);
638 read_unlock(&obd_dev_lock);
641 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
643 void obd_cleanup_caches(void)
648 if (obd_device_cachep) {
649 rc = cfs_mem_cache_destroy(obd_device_cachep);
650 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
651 obd_device_cachep = NULL;
654 rc = cfs_mem_cache_destroy(obdo_cachep);
655 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
659 rc = cfs_mem_cache_destroy(import_cachep);
660 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
661 import_cachep = NULL;
664 rc = cfs_mem_cache_destroy(capa_cachep);
665 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
671 int obd_init_caches(void)
675 LASSERT(obd_device_cachep == NULL);
676 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
677 sizeof(struct obd_device),
679 if (!obd_device_cachep)
682 LASSERT(obdo_cachep == NULL);
683 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
688 LASSERT(import_cachep == NULL);
689 import_cachep = cfs_mem_cache_create("ll_import_cache",
690 sizeof(struct obd_import),
695 LASSERT(capa_cachep == NULL);
696 capa_cachep = cfs_mem_cache_create("capa_cache",
697 sizeof(struct obd_capa), 0, 0);
703 obd_cleanup_caches();
708 /* map connection to client */
709 struct obd_export *class_conn2export(struct lustre_handle *conn)
711 struct obd_export *export;
715 CDEBUG(D_CACHE, "looking for null handle\n");
719 if (conn->cookie == -1) { /* this means assign a new connection */
720 CDEBUG(D_CACHE, "want a new connection\n");
724 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
725 export = class_handle2object(conn->cookie);
728 EXPORT_SYMBOL(class_conn2export);
730 struct obd_device *class_exp2obd(struct obd_export *exp)
736 EXPORT_SYMBOL(class_exp2obd);
738 struct obd_device *class_conn2obd(struct lustre_handle *conn)
740 struct obd_export *export;
741 export = class_conn2export(conn);
743 struct obd_device *obd = export->exp_obd;
744 class_export_put(export);
749 EXPORT_SYMBOL(class_conn2obd);
751 struct obd_import *class_exp2cliimp(struct obd_export *exp)
753 struct obd_device *obd = exp->exp_obd;
756 return obd->u.cli.cl_import;
758 EXPORT_SYMBOL(class_exp2cliimp);
760 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
762 struct obd_device *obd = class_conn2obd(conn);
765 return obd->u.cli.cl_import;
767 EXPORT_SYMBOL(class_conn2cliimp);
769 /* Export management functions */
770 static void class_export_destroy(struct obd_export *exp)
772 struct obd_device *obd = exp->exp_obd;
775 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
776 LASSERT(obd != NULL);
778 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
779 exp->exp_client_uuid.uuid, obd->obd_name);
781 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
782 if (exp->exp_connection)
783 ptlrpc_put_connection_superhack(exp->exp_connection);
785 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
786 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
787 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
788 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
789 obd_destroy_export(exp);
790 class_decref(obd, "export", exp);
792 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
796 static void export_handle_addref(void *export)
798 class_export_get(export);
801 static struct portals_handle_ops export_handle_ops = {
802 .hop_addref = export_handle_addref,
806 struct obd_export *class_export_get(struct obd_export *exp)
808 cfs_atomic_inc(&exp->exp_refcount);
809 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
810 cfs_atomic_read(&exp->exp_refcount));
813 EXPORT_SYMBOL(class_export_get);
815 void class_export_put(struct obd_export *exp)
817 LASSERT(exp != NULL);
818 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
819 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
820 cfs_atomic_read(&exp->exp_refcount) - 1);
822 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
823 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
824 CDEBUG(D_IOCTL, "final put %p/%s\n",
825 exp, exp->exp_client_uuid.uuid);
827 /* release nid stat refererence */
828 lprocfs_exp_cleanup(exp);
830 obd_zombie_export_add(exp);
833 EXPORT_SYMBOL(class_export_put);
835 /* Creates a new export, adds it to the hash table, and returns a
836 * pointer to it. The refcount is 2: one for the hash reference, and
837 * one for the pointer returned by this function. */
838 struct obd_export *class_new_export(struct obd_device *obd,
839 struct obd_uuid *cluuid)
841 struct obd_export *export;
842 cfs_hash_t *hash = NULL;
846 OBD_ALLOC_PTR(export);
848 return ERR_PTR(-ENOMEM);
850 export->exp_conn_cnt = 0;
851 export->exp_lock_hash = NULL;
852 export->exp_flock_hash = NULL;
853 cfs_atomic_set(&export->exp_refcount, 2);
854 cfs_atomic_set(&export->exp_rpc_count, 0);
855 cfs_atomic_set(&export->exp_cb_count, 0);
856 cfs_atomic_set(&export->exp_locks_count, 0);
857 #if LUSTRE_TRACKS_LOCK_EXP_REFS
858 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
859 spin_lock_init(&export->exp_locks_list_guard);
861 cfs_atomic_set(&export->exp_replay_count, 0);
862 export->exp_obd = obd;
863 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
864 spin_lock_init(&export->exp_uncommitted_replies_lock);
865 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
866 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
867 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
868 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
869 class_handle_hash(&export->exp_handle, &export_handle_ops);
870 export->exp_last_request_time = cfs_time_current_sec();
871 spin_lock_init(&export->exp_lock);
872 spin_lock_init(&export->exp_rpc_lock);
873 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
874 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
875 spin_lock_init(&export->exp_bl_list_lock);
876 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
878 export->exp_sp_peer = LUSTRE_SP_ANY;
879 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
880 export->exp_client_uuid = *cluuid;
881 obd_init_export(export);
883 spin_lock(&obd->obd_dev_lock);
884 /* shouldn't happen, but might race */
885 if (obd->obd_stopping)
886 GOTO(exit_unlock, rc = -ENODEV);
888 hash = cfs_hash_getref(obd->obd_uuid_hash);
890 GOTO(exit_unlock, rc = -ENODEV);
891 spin_unlock(&obd->obd_dev_lock);
893 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
894 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
896 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
897 obd->obd_name, cluuid->uuid, rc);
898 GOTO(exit_err, rc = -EALREADY);
902 spin_lock(&obd->obd_dev_lock);
903 if (obd->obd_stopping) {
904 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
905 GOTO(exit_unlock, rc = -ENODEV);
908 class_incref(obd, "export", export);
909 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
910 cfs_list_add_tail(&export->exp_obd_chain_timed,
911 &export->exp_obd->obd_exports_timed);
912 export->exp_obd->obd_num_exports++;
913 spin_unlock(&obd->obd_dev_lock);
914 cfs_hash_putref(hash);
918 spin_unlock(&obd->obd_dev_lock);
921 cfs_hash_putref(hash);
922 class_handle_unhash(&export->exp_handle);
923 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
924 obd_destroy_export(export);
925 OBD_FREE_PTR(export);
928 EXPORT_SYMBOL(class_new_export);
930 void class_unlink_export(struct obd_export *exp)
932 class_handle_unhash(&exp->exp_handle);
934 spin_lock(&exp->exp_obd->obd_dev_lock);
935 /* delete an uuid-export hashitem from hashtables */
936 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
937 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
938 &exp->exp_client_uuid,
939 &exp->exp_uuid_hash);
941 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
942 cfs_list_del_init(&exp->exp_obd_chain_timed);
943 exp->exp_obd->obd_num_exports--;
944 spin_unlock(&exp->exp_obd->obd_dev_lock);
945 class_export_put(exp);
947 EXPORT_SYMBOL(class_unlink_export);
949 /* Import management functions */
950 void class_import_destroy(struct obd_import *imp)
954 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
955 imp->imp_obd->obd_name);
957 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
959 ptlrpc_put_connection_superhack(imp->imp_connection);
961 while (!cfs_list_empty(&imp->imp_conn_list)) {
962 struct obd_import_conn *imp_conn;
964 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
965 struct obd_import_conn, oic_item);
966 cfs_list_del_init(&imp_conn->oic_item);
967 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
968 OBD_FREE(imp_conn, sizeof(*imp_conn));
971 LASSERT(imp->imp_sec == NULL);
972 class_decref(imp->imp_obd, "import", imp);
973 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
977 static void import_handle_addref(void *import)
979 class_import_get(import);
982 static struct portals_handle_ops import_handle_ops = {
983 .hop_addref = import_handle_addref,
987 struct obd_import *class_import_get(struct obd_import *import)
989 cfs_atomic_inc(&import->imp_refcount);
990 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
991 cfs_atomic_read(&import->imp_refcount),
992 import->imp_obd->obd_name);
995 EXPORT_SYMBOL(class_import_get);
997 void class_import_put(struct obd_import *imp)
1001 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1002 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1004 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1005 cfs_atomic_read(&imp->imp_refcount) - 1,
1006 imp->imp_obd->obd_name);
1008 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1009 CDEBUG(D_INFO, "final put import %p\n", imp);
1010 obd_zombie_import_add(imp);
1013 /* catch possible import put race */
1014 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1017 EXPORT_SYMBOL(class_import_put);
1019 static void init_imp_at(struct imp_at *at) {
1021 at_init(&at->iat_net_latency, 0, 0);
1022 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1023 /* max service estimates are tracked on the server side, so
1024 don't use the AT history here, just use the last reported
1025 val. (But keep hist for proc histogram, worst_ever) */
1026 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1031 struct obd_import *class_new_import(struct obd_device *obd)
1033 struct obd_import *imp;
1035 OBD_ALLOC(imp, sizeof(*imp));
1039 CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1040 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1041 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1042 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1043 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1044 spin_lock_init(&imp->imp_lock);
1045 imp->imp_last_success_conn = 0;
1046 imp->imp_state = LUSTRE_IMP_NEW;
1047 imp->imp_obd = class_incref(obd, "import", imp);
1048 mutex_init(&imp->imp_sec_mutex);
1049 cfs_waitq_init(&imp->imp_recovery_waitq);
1051 cfs_atomic_set(&imp->imp_refcount, 2);
1052 cfs_atomic_set(&imp->imp_unregistering, 0);
1053 cfs_atomic_set(&imp->imp_inflight, 0);
1054 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1055 cfs_atomic_set(&imp->imp_inval_count, 0);
1056 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1057 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1058 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1059 init_imp_at(&imp->imp_at);
1061 /* the default magic is V2, will be used in connect RPC, and
1062 * then adjusted according to the flags in request/reply. */
1063 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1067 EXPORT_SYMBOL(class_new_import);
1069 void class_destroy_import(struct obd_import *import)
1071 LASSERT(import != NULL);
1072 LASSERT(import != LP_POISON);
1074 class_handle_unhash(&import->imp_handle);
1076 spin_lock(&import->imp_lock);
1077 import->imp_generation++;
1078 spin_unlock(&import->imp_lock);
1079 class_import_put(import);
1081 EXPORT_SYMBOL(class_destroy_import);
1083 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1085 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1087 spin_lock(&exp->exp_locks_list_guard);
1089 LASSERT(lock->l_exp_refs_nr >= 0);
1091 if (lock->l_exp_refs_target != NULL &&
1092 lock->l_exp_refs_target != exp) {
1093 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1094 exp, lock, lock->l_exp_refs_target);
1096 if ((lock->l_exp_refs_nr ++) == 0) {
1097 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1098 lock->l_exp_refs_target = exp;
1100 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1101 lock, exp, lock->l_exp_refs_nr);
1102 spin_unlock(&exp->exp_locks_list_guard);
1104 EXPORT_SYMBOL(__class_export_add_lock_ref);
1106 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1108 spin_lock(&exp->exp_locks_list_guard);
1109 LASSERT(lock->l_exp_refs_nr > 0);
1110 if (lock->l_exp_refs_target != exp) {
1111 LCONSOLE_WARN("lock %p, "
1112 "mismatching export pointers: %p, %p\n",
1113 lock, lock->l_exp_refs_target, exp);
1115 if (-- lock->l_exp_refs_nr == 0) {
1116 cfs_list_del_init(&lock->l_exp_refs_link);
1117 lock->l_exp_refs_target = NULL;
1119 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1120 lock, exp, lock->l_exp_refs_nr);
1121 spin_unlock(&exp->exp_locks_list_guard);
1123 EXPORT_SYMBOL(__class_export_del_lock_ref);
1126 /* A connection defines an export context in which preallocation can
1127 be managed. This releases the export pointer reference, and returns
1128 the export handle, so the export refcount is 1 when this function
1130 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1131 struct obd_uuid *cluuid)
1133 struct obd_export *export;
1134 LASSERT(conn != NULL);
1135 LASSERT(obd != NULL);
1136 LASSERT(cluuid != NULL);
1139 export = class_new_export(obd, cluuid);
1141 RETURN(PTR_ERR(export));
1143 conn->cookie = export->exp_handle.h_cookie;
1144 class_export_put(export);
1146 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1147 cluuid->uuid, conn->cookie);
1150 EXPORT_SYMBOL(class_connect);
1152 /* if export is involved in recovery then clean up related things */
1153 void class_export_recovery_cleanup(struct obd_export *exp)
1155 struct obd_device *obd = exp->exp_obd;
1157 spin_lock(&obd->obd_recovery_task_lock);
1158 if (exp->exp_delayed)
1159 obd->obd_delayed_clients--;
1160 if (obd->obd_recovering) {
1161 if (exp->exp_in_recovery) {
1162 spin_lock(&exp->exp_lock);
1163 exp->exp_in_recovery = 0;
1164 spin_unlock(&exp->exp_lock);
1165 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1166 cfs_atomic_dec(&obd->obd_connected_clients);
1169 /* if called during recovery then should update
1170 * obd_stale_clients counter,
1171 * lightweight exports are not counted */
1172 if (exp->exp_failed &&
1173 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1174 exp->exp_obd->obd_stale_clients++;
1176 spin_unlock(&obd->obd_recovery_task_lock);
1177 /** Cleanup req replay fields */
1178 if (exp->exp_req_replay_needed) {
1179 spin_lock(&exp->exp_lock);
1180 exp->exp_req_replay_needed = 0;
1181 spin_unlock(&exp->exp_lock);
1182 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1183 cfs_atomic_dec(&obd->obd_req_replay_clients);
1185 /** Cleanup lock replay data */
1186 if (exp->exp_lock_replay_needed) {
1187 spin_lock(&exp->exp_lock);
1188 exp->exp_lock_replay_needed = 0;
1189 spin_unlock(&exp->exp_lock);
1190 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1191 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1195 /* This function removes 1-3 references from the export:
1196 * 1 - for export pointer passed
1197 * and if disconnect really need
1198 * 2 - removing from hash
1199 * 3 - in client_unlink_export
1200 * The export pointer passed to this function can destroyed */
1201 int class_disconnect(struct obd_export *export)
1203 int already_disconnected;
1206 if (export == NULL) {
1207 CWARN("attempting to free NULL export %p\n", export);
1211 spin_lock(&export->exp_lock);
1212 already_disconnected = export->exp_disconnected;
1213 export->exp_disconnected = 1;
1214 spin_unlock(&export->exp_lock);
1216 /* class_cleanup(), abort_recovery(), and class_fail_export()
1217 * all end up in here, and if any of them race we shouldn't
1218 * call extra class_export_puts(). */
1219 if (already_disconnected) {
1220 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1221 GOTO(no_disconn, already_disconnected);
1224 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1225 export->exp_handle.h_cookie);
1227 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1228 cfs_hash_del(export->exp_obd->obd_nid_hash,
1229 &export->exp_connection->c_peer.nid,
1230 &export->exp_nid_hash);
1232 class_export_recovery_cleanup(export);
1233 class_unlink_export(export);
1235 class_export_put(export);
1238 EXPORT_SYMBOL(class_disconnect);
1240 /* Return non-zero for a fully connected export */
1241 int class_connected_export(struct obd_export *exp)
1245 spin_lock(&exp->exp_lock);
1246 connected = (exp->exp_conn_cnt > 0);
1247 spin_unlock(&exp->exp_lock);
1252 EXPORT_SYMBOL(class_connected_export);
1254 static void class_disconnect_export_list(cfs_list_t *list,
1255 enum obd_option flags)
1258 struct obd_export *exp;
1261 /* It's possible that an export may disconnect itself, but
1262 * nothing else will be added to this list. */
1263 while (!cfs_list_empty(list)) {
1264 exp = cfs_list_entry(list->next, struct obd_export,
1266 /* need for safe call CDEBUG after obd_disconnect */
1267 class_export_get(exp);
1269 spin_lock(&exp->exp_lock);
1270 exp->exp_flags = flags;
1271 spin_unlock(&exp->exp_lock);
1273 if (obd_uuid_equals(&exp->exp_client_uuid,
1274 &exp->exp_obd->obd_uuid)) {
1276 "exp %p export uuid == obd uuid, don't discon\n",
1278 /* Need to delete this now so we don't end up pointing
1279 * to work_list later when this export is cleaned up. */
1280 cfs_list_del_init(&exp->exp_obd_chain);
1281 class_export_put(exp);
1285 class_export_get(exp);
1286 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1287 "last request at "CFS_TIME_T"\n",
1288 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1289 exp, exp->exp_last_request_time);
1290 /* release one export reference anyway */
1291 rc = obd_disconnect(exp);
1293 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1294 obd_export_nid2str(exp), exp, rc);
1295 class_export_put(exp);
1300 void class_disconnect_exports(struct obd_device *obd)
1302 cfs_list_t work_list;
1305 /* Move all of the exports from obd_exports to a work list, en masse. */
1306 CFS_INIT_LIST_HEAD(&work_list);
1307 spin_lock(&obd->obd_dev_lock);
1308 cfs_list_splice_init(&obd->obd_exports, &work_list);
1309 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1310 spin_unlock(&obd->obd_dev_lock);
1312 if (!cfs_list_empty(&work_list)) {
1313 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1314 "disconnecting them\n", obd->obd_minor, obd);
1315 class_disconnect_export_list(&work_list,
1316 exp_flags_from_obd(obd));
1318 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1319 obd->obd_minor, obd);
1322 EXPORT_SYMBOL(class_disconnect_exports);
1324 /* Remove exports that have not completed recovery.
1326 void class_disconnect_stale_exports(struct obd_device *obd,
1327 int (*test_export)(struct obd_export *))
1329 cfs_list_t work_list;
1330 struct obd_export *exp, *n;
1334 CFS_INIT_LIST_HEAD(&work_list);
1335 spin_lock(&obd->obd_dev_lock);
1336 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1338 /* don't count self-export as client */
1339 if (obd_uuid_equals(&exp->exp_client_uuid,
1340 &exp->exp_obd->obd_uuid))
1343 /* don't evict clients which have no slot in last_rcvd
1344 * (e.g. lightweight connection) */
1345 if (exp->exp_target_data.ted_lr_idx == -1)
1348 spin_lock(&exp->exp_lock);
1349 if (exp->exp_failed || test_export(exp)) {
1350 spin_unlock(&exp->exp_lock);
1353 exp->exp_failed = 1;
1354 spin_unlock(&exp->exp_lock);
1356 cfs_list_move(&exp->exp_obd_chain, &work_list);
1358 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1359 obd->obd_name, exp->exp_client_uuid.uuid,
1360 exp->exp_connection == NULL ? "<unknown>" :
1361 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1362 print_export_data(exp, "EVICTING", 0);
1364 spin_unlock(&obd->obd_dev_lock);
1367 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1368 obd->obd_name, evicted);
1370 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1371 OBD_OPT_ABORT_RECOV);
1374 EXPORT_SYMBOL(class_disconnect_stale_exports);
1376 void class_fail_export(struct obd_export *exp)
1378 int rc, already_failed;
1380 spin_lock(&exp->exp_lock);
1381 already_failed = exp->exp_failed;
1382 exp->exp_failed = 1;
1383 spin_unlock(&exp->exp_lock);
1385 if (already_failed) {
1386 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1387 exp, exp->exp_client_uuid.uuid);
1391 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1392 exp, exp->exp_client_uuid.uuid);
1394 if (obd_dump_on_timeout)
1395 libcfs_debug_dumplog();
1397 /* need for safe call CDEBUG after obd_disconnect */
1398 class_export_get(exp);
1400 /* Most callers into obd_disconnect are removing their own reference
1401 * (request, for example) in addition to the one from the hash table.
1402 * We don't have such a reference here, so make one. */
1403 class_export_get(exp);
1404 rc = obd_disconnect(exp);
1406 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1408 CDEBUG(D_HA, "disconnected export %p/%s\n",
1409 exp, exp->exp_client_uuid.uuid);
1410 class_export_put(exp);
1412 EXPORT_SYMBOL(class_fail_export);
1414 char *obd_export_nid2str(struct obd_export *exp)
1416 if (exp->exp_connection != NULL)
1417 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1421 EXPORT_SYMBOL(obd_export_nid2str);
1423 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1425 cfs_hash_t *nid_hash;
1426 struct obd_export *doomed_exp = NULL;
1427 int exports_evicted = 0;
1429 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1431 spin_lock(&obd->obd_dev_lock);
1432 /* umount has run already, so evict thread should leave
1433 * its task to umount thread now */
1434 if (obd->obd_stopping) {
1435 spin_unlock(&obd->obd_dev_lock);
1436 return exports_evicted;
1438 nid_hash = obd->obd_nid_hash;
1439 cfs_hash_getref(nid_hash);
1440 spin_unlock(&obd->obd_dev_lock);
1443 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1444 if (doomed_exp == NULL)
1447 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1448 "nid %s found, wanted nid %s, requested nid %s\n",
1449 obd_export_nid2str(doomed_exp),
1450 libcfs_nid2str(nid_key), nid);
1451 LASSERTF(doomed_exp != obd->obd_self_export,
1452 "self-export is hashed by NID?\n");
1454 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1455 "request\n", obd->obd_name,
1456 obd_uuid2str(&doomed_exp->exp_client_uuid),
1457 obd_export_nid2str(doomed_exp));
1458 class_fail_export(doomed_exp);
1459 class_export_put(doomed_exp);
1462 cfs_hash_putref(nid_hash);
1464 if (!exports_evicted)
1465 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1466 obd->obd_name, nid);
1467 return exports_evicted;
1469 EXPORT_SYMBOL(obd_export_evict_by_nid);
1471 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1473 cfs_hash_t *uuid_hash;
1474 struct obd_export *doomed_exp = NULL;
1475 struct obd_uuid doomed_uuid;
1476 int exports_evicted = 0;
1478 spin_lock(&obd->obd_dev_lock);
1479 if (obd->obd_stopping) {
1480 spin_unlock(&obd->obd_dev_lock);
1481 return exports_evicted;
1483 uuid_hash = obd->obd_uuid_hash;
1484 cfs_hash_getref(uuid_hash);
1485 spin_unlock(&obd->obd_dev_lock);
1487 obd_str2uuid(&doomed_uuid, uuid);
1488 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1489 CERROR("%s: can't evict myself\n", obd->obd_name);
1490 cfs_hash_putref(uuid_hash);
1491 return exports_evicted;
1494 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1496 if (doomed_exp == NULL) {
1497 CERROR("%s: can't disconnect %s: no exports found\n",
1498 obd->obd_name, uuid);
1500 CWARN("%s: evicting %s at adminstrative request\n",
1501 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1502 class_fail_export(doomed_exp);
1503 class_export_put(doomed_exp);
1506 cfs_hash_putref(uuid_hash);
1508 return exports_evicted;
1510 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1512 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1513 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1514 EXPORT_SYMBOL(class_export_dump_hook);
1517 static void print_export_data(struct obd_export *exp, const char *status,
1520 struct ptlrpc_reply_state *rs;
1521 struct ptlrpc_reply_state *first_reply = NULL;
1524 spin_lock(&exp->exp_lock);
1525 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1531 spin_unlock(&exp->exp_lock);
1533 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1534 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1535 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1536 cfs_atomic_read(&exp->exp_rpc_count),
1537 cfs_atomic_read(&exp->exp_cb_count),
1538 cfs_atomic_read(&exp->exp_locks_count),
1539 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1540 nreplies, first_reply, nreplies > 3 ? "..." : "",
1541 exp->exp_last_committed);
1542 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1543 if (locks && class_export_dump_hook != NULL)
1544 class_export_dump_hook(exp);
1548 void dump_exports(struct obd_device *obd, int locks)
1550 struct obd_export *exp;
1552 spin_lock(&obd->obd_dev_lock);
1553 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1554 print_export_data(exp, "ACTIVE", locks);
1555 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1556 print_export_data(exp, "UNLINKED", locks);
1557 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1558 print_export_data(exp, "DELAYED", locks);
1559 spin_unlock(&obd->obd_dev_lock);
1560 spin_lock(&obd_zombie_impexp_lock);
1561 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1562 print_export_data(exp, "ZOMBIE", locks);
1563 spin_unlock(&obd_zombie_impexp_lock);
1565 EXPORT_SYMBOL(dump_exports);
1567 void obd_exports_barrier(struct obd_device *obd)
1570 LASSERT(cfs_list_empty(&obd->obd_exports));
1571 spin_lock(&obd->obd_dev_lock);
1572 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1573 spin_unlock(&obd->obd_dev_lock);
1574 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1575 cfs_time_seconds(waited));
1576 if (waited > 5 && IS_PO2(waited)) {
1577 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1578 "more than %d seconds. "
1579 "The obd refcount = %d. Is it stuck?\n",
1580 obd->obd_name, waited,
1581 cfs_atomic_read(&obd->obd_refcount));
1582 dump_exports(obd, 1);
1585 spin_lock(&obd->obd_dev_lock);
1587 spin_unlock(&obd->obd_dev_lock);
1589 EXPORT_SYMBOL(obd_exports_barrier);
1591 /* Total amount of zombies to be destroyed */
1592 static int zombies_count = 0;
1595 * kill zombie imports and exports
1597 void obd_zombie_impexp_cull(void)
1599 struct obd_import *import;
1600 struct obd_export *export;
1604 spin_lock(&obd_zombie_impexp_lock);
1607 if (!cfs_list_empty(&obd_zombie_imports)) {
1608 import = cfs_list_entry(obd_zombie_imports.next,
1611 cfs_list_del_init(&import->imp_zombie_chain);
1615 if (!cfs_list_empty(&obd_zombie_exports)) {
1616 export = cfs_list_entry(obd_zombie_exports.next,
1619 cfs_list_del_init(&export->exp_obd_chain);
1622 spin_unlock(&obd_zombie_impexp_lock);
1624 if (import != NULL) {
1625 class_import_destroy(import);
1626 spin_lock(&obd_zombie_impexp_lock);
1628 spin_unlock(&obd_zombie_impexp_lock);
1631 if (export != NULL) {
1632 class_export_destroy(export);
1633 spin_lock(&obd_zombie_impexp_lock);
1635 spin_unlock(&obd_zombie_impexp_lock);
1639 } while (import != NULL || export != NULL);
1643 static struct completion obd_zombie_start;
1644 static struct completion obd_zombie_stop;
1645 static unsigned long obd_zombie_flags;
1646 static cfs_waitq_t obd_zombie_waitq;
1647 static pid_t obd_zombie_pid;
1650 OBD_ZOMBIE_STOP = 0x0001,
1654 * check for work for kill zombie import/export thread.
1656 static int obd_zombie_impexp_check(void *arg)
1660 spin_lock(&obd_zombie_impexp_lock);
1661 rc = (zombies_count == 0) &&
1662 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1663 spin_unlock(&obd_zombie_impexp_lock);
1669 * Add export to the obd_zombe thread and notify it.
1671 static void obd_zombie_export_add(struct obd_export *exp) {
1672 spin_lock(&exp->exp_obd->obd_dev_lock);
1673 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1674 cfs_list_del_init(&exp->exp_obd_chain);
1675 spin_unlock(&exp->exp_obd->obd_dev_lock);
1676 spin_lock(&obd_zombie_impexp_lock);
1678 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1679 spin_unlock(&obd_zombie_impexp_lock);
1681 obd_zombie_impexp_notify();
1685 * Add import to the obd_zombe thread and notify it.
1687 static void obd_zombie_import_add(struct obd_import *imp) {
1688 LASSERT(imp->imp_sec == NULL);
1689 LASSERT(imp->imp_rq_pool == NULL);
1690 spin_lock(&obd_zombie_impexp_lock);
1691 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1693 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1694 spin_unlock(&obd_zombie_impexp_lock);
1696 obd_zombie_impexp_notify();
1700 * notify import/export destroy thread about new zombie.
1702 static void obd_zombie_impexp_notify(void)
1705 * Make sure obd_zomebie_impexp_thread get this notification.
1706 * It is possible this signal only get by obd_zombie_barrier, and
1707 * barrier gulps this notification and sleeps away and hangs ensues
1709 cfs_waitq_broadcast(&obd_zombie_waitq);
1713 * check whether obd_zombie is idle
1715 static int obd_zombie_is_idle(void)
1719 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1720 spin_lock(&obd_zombie_impexp_lock);
1721 rc = (zombies_count == 0);
1722 spin_unlock(&obd_zombie_impexp_lock);
1727 * wait when obd_zombie import/export queues become empty
1729 void obd_zombie_barrier(void)
1731 struct l_wait_info lwi = { 0 };
1733 if (obd_zombie_pid == cfs_curproc_pid())
1734 /* don't wait for myself */
1736 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1738 EXPORT_SYMBOL(obd_zombie_barrier);
1743 * destroy zombie export/import thread.
1745 static int obd_zombie_impexp_thread(void *unused)
1749 rc = cfs_daemonize_ctxt("obd_zombid");
1751 complete(&obd_zombie_start);
1755 complete(&obd_zombie_start);
1757 obd_zombie_pid = cfs_curproc_pid();
1759 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1760 struct l_wait_info lwi = { 0 };
1762 l_wait_event(obd_zombie_waitq,
1763 !obd_zombie_impexp_check(NULL), &lwi);
1764 obd_zombie_impexp_cull();
1767 * Notify obd_zombie_barrier callers that queues
1770 cfs_waitq_signal(&obd_zombie_waitq);
1773 complete(&obd_zombie_stop);
1778 #else /* ! KERNEL */
1780 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1781 static void *obd_zombie_impexp_work_cb;
1782 static void *obd_zombie_impexp_idle_cb;
1784 int obd_zombie_impexp_kill(void *arg)
1788 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1789 obd_zombie_impexp_cull();
1792 cfs_atomic_dec(&zombie_recur);
1799 * start destroy zombie import/export thread
1801 int obd_zombie_impexp_init(void)
1805 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1806 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1807 spin_lock_init(&obd_zombie_impexp_lock);
1808 init_completion(&obd_zombie_start);
1809 init_completion(&obd_zombie_stop);
1810 cfs_waitq_init(&obd_zombie_waitq);
1814 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1818 wait_for_completion(&obd_zombie_start);
1821 obd_zombie_impexp_work_cb =
1822 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1823 &obd_zombie_impexp_kill, NULL);
1825 obd_zombie_impexp_idle_cb =
1826 liblustre_register_idle_callback("obd_zombi_impexp_check",
1827 &obd_zombie_impexp_check, NULL);
1833 * stop destroy zombie import/export thread
1835 void obd_zombie_impexp_stop(void)
1837 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1838 obd_zombie_impexp_notify();
1840 wait_for_completion(&obd_zombie_stop);
1842 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1843 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1847 /***** Kernel-userspace comm helpers *******/
1849 /* Get length of entire message, including header */
1850 int kuc_len(int payload_len)
1852 return sizeof(struct kuc_hdr) + payload_len;
1854 EXPORT_SYMBOL(kuc_len);
1856 /* Get a pointer to kuc header, given a ptr to the payload
1857 * @param p Pointer to payload area
1858 * @returns Pointer to kuc header
1860 struct kuc_hdr * kuc_ptr(void *p)
1862 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1863 LASSERT(lh->kuc_magic == KUC_MAGIC);
1866 EXPORT_SYMBOL(kuc_ptr);
1868 /* Test if payload is part of kuc message
1869 * @param p Pointer to payload area
1872 int kuc_ispayload(void *p)
1874 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1876 if (kh->kuc_magic == KUC_MAGIC)
1881 EXPORT_SYMBOL(kuc_ispayload);
1883 /* Alloc space for a message, and fill in header
1884 * @return Pointer to payload area
1886 void *kuc_alloc(int payload_len, int transport, int type)
1889 int len = kuc_len(payload_len);
1893 return ERR_PTR(-ENOMEM);
1895 lh->kuc_magic = KUC_MAGIC;
1896 lh->kuc_transport = transport;
1897 lh->kuc_msgtype = type;
1898 lh->kuc_msglen = len;
1900 return (void *)(lh + 1);
1902 EXPORT_SYMBOL(kuc_alloc);
1904 /* Takes pointer to payload area */
1905 inline void kuc_free(void *p, int payload_len)
1907 struct kuc_hdr *lh = kuc_ptr(p);
1908 OBD_FREE(lh, kuc_len(payload_len));
1910 EXPORT_SYMBOL(kuc_free);