4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
46 spinlock_t obd_types_lock;
48 struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 struct kmem_cache *import_cachep;
53 struct list_head obd_zombie_imports;
54 struct list_head obd_zombie_exports;
55 spinlock_t obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60 const char *status, int locks);
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
66 * support functions: we could use inter-module communication, but this
67 * is more portable to other OS's
69 static struct obd_device *obd_device_alloc(void)
71 struct obd_device *obd;
73 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75 obd->obd_magic = OBD_DEVICE_MAGIC;
80 static void obd_device_free(struct obd_device *obd)
83 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85 if (obd->obd_namespace != NULL) {
86 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87 obd, obd->obd_namespace, obd->obd_force);
90 lu_ref_fini(&obd->obd_reference);
91 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 struct obd_type *class_search_type(const char *name)
96 struct list_head *tmp;
97 struct obd_type *type;
99 spin_lock(&obd_types_lock);
100 list_for_each(tmp, &obd_types) {
101 type = list_entry(tmp, struct obd_type, typ_chain);
102 if (strcmp(type->typ_name, name) == 0) {
103 spin_unlock(&obd_types_lock);
107 spin_unlock(&obd_types_lock);
110 EXPORT_SYMBOL(class_search_type);
112 struct obd_type *class_get_type(const char *name)
114 struct obd_type *type = class_search_type(name);
116 #ifdef HAVE_MODULE_LOADING_SUPPORT
118 const char *modname = name;
120 if (strcmp(modname, "obdfilter") == 0)
123 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124 modname = LUSTRE_OSP_NAME;
126 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127 modname = LUSTRE_MDT_NAME;
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 spin_lock(&type->obd_type_lock);
141 try_module_get(type->typ_dt_ops->o_owner);
142 spin_unlock(&type->obd_type_lock);
146 EXPORT_SYMBOL(class_get_type);
148 void class_put_type(struct obd_type *type)
151 spin_lock(&type->obd_type_lock);
153 module_put(type->typ_dt_ops->o_owner);
154 spin_unlock(&type->obd_type_lock);
156 EXPORT_SYMBOL(class_put_type);
158 #define CLASS_MAX_NAME 1024
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161 bool enable_proc, struct lprocfs_seq_vars *module_vars,
162 #ifndef HAVE_ONLY_PROCFS_SEQ
163 struct lprocfs_vars *vars,
165 const char *name, struct lu_device_type *ldt)
167 struct obd_type *type;
172 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
174 if (class_search_type(name)) {
175 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
180 OBD_ALLOC(type, sizeof(*type));
184 OBD_ALLOC_PTR(type->typ_dt_ops);
185 OBD_ALLOC_PTR(type->typ_md_ops);
186 OBD_ALLOC(type->typ_name, strlen(name) + 1);
188 if (type->typ_dt_ops == NULL ||
189 type->typ_md_ops == NULL ||
190 type->typ_name == NULL)
193 *(type->typ_dt_ops) = *dt_ops;
194 /* md_ops is optional */
196 *(type->typ_md_ops) = *md_ops;
197 strcpy(type->typ_name, name);
198 spin_lock_init(&type->obd_type_lock);
202 #ifndef HAVE_ONLY_PROCFS_SEQ
204 type->typ_procroot = lprocfs_register(type->typ_name,
210 type->typ_procroot = lprocfs_seq_register(type->typ_name,
214 if (IS_ERR(type->typ_procroot)) {
215 rc = PTR_ERR(type->typ_procroot);
216 type->typ_procroot = NULL;
223 rc = lu_device_type_init(ldt);
228 spin_lock(&obd_types_lock);
229 list_add(&type->typ_chain, &obd_types);
230 spin_unlock(&obd_types_lock);
235 if (type->typ_name != NULL) {
237 if (type->typ_procroot != NULL) {
238 #ifndef HAVE_ONLY_PROCFS_SEQ
239 lprocfs_try_remove_proc_entry(type->typ_name,
242 remove_proc_subtree(type->typ_name, proc_lustre_root);
246 OBD_FREE(type->typ_name, strlen(name) + 1);
248 if (type->typ_md_ops != NULL)
249 OBD_FREE_PTR(type->typ_md_ops);
250 if (type->typ_dt_ops != NULL)
251 OBD_FREE_PTR(type->typ_dt_ops);
252 OBD_FREE(type, sizeof(*type));
255 EXPORT_SYMBOL(class_register_type);
257 int class_unregister_type(const char *name)
259 struct obd_type *type = class_search_type(name);
263 CERROR("unknown obd type\n");
267 if (type->typ_refcnt) {
268 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
269 /* This is a bad situation, let's make the best of it */
270 /* Remove ops, but leave the name for debugging */
271 OBD_FREE_PTR(type->typ_dt_ops);
272 OBD_FREE_PTR(type->typ_md_ops);
276 /* we do not use type->typ_procroot as for compatibility purposes
277 * other modules can share names (i.e. lod can use lov entry). so
278 * we can't reference pointer as it can get invalided when another
279 * module removes the entry */
281 if (type->typ_procroot != NULL) {
282 #ifndef HAVE_ONLY_PROCFS_SEQ
283 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
285 remove_proc_subtree(type->typ_name, proc_lustre_root);
289 if (type->typ_procsym != NULL)
290 lprocfs_remove(&type->typ_procsym);
293 lu_device_type_fini(type->typ_lu);
295 spin_lock(&obd_types_lock);
296 list_del(&type->typ_chain);
297 spin_unlock(&obd_types_lock);
298 OBD_FREE(type->typ_name, strlen(name) + 1);
299 if (type->typ_dt_ops != NULL)
300 OBD_FREE_PTR(type->typ_dt_ops);
301 if (type->typ_md_ops != NULL)
302 OBD_FREE_PTR(type->typ_md_ops);
303 OBD_FREE(type, sizeof(*type));
305 } /* class_unregister_type */
306 EXPORT_SYMBOL(class_unregister_type);
309 * Create a new obd device.
311 * Find an empty slot in ::obd_devs[], create a new obd device in it.
313 * \param[in] type_name obd device type string.
314 * \param[in] name obd device name.
316 * \retval NULL if create fails, otherwise return the obd device
319 struct obd_device *class_newdev(const char *type_name, const char *name)
321 struct obd_device *result = NULL;
322 struct obd_device *newdev;
323 struct obd_type *type = NULL;
325 int new_obd_minor = 0;
328 if (strlen(name) >= MAX_OBD_NAME) {
329 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
330 RETURN(ERR_PTR(-EINVAL));
333 type = class_get_type(type_name);
335 CERROR("OBD: unknown type: %s\n", type_name);
336 RETURN(ERR_PTR(-ENODEV));
339 newdev = obd_device_alloc();
341 GOTO(out_type, result = ERR_PTR(-ENOMEM));
343 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
345 write_lock(&obd_dev_lock);
346 for (i = 0; i < class_devno_max(); i++) {
347 struct obd_device *obd = class_num2obd(i);
349 if (obd && (strcmp(name, obd->obd_name) == 0)) {
350 CERROR("Device %s already exists at %d, won't add\n",
353 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
354 "%p obd_magic %08x != %08x\n", result,
355 result->obd_magic, OBD_DEVICE_MAGIC);
356 LASSERTF(result->obd_minor == new_obd_minor,
357 "%p obd_minor %d != %d\n", result,
358 result->obd_minor, new_obd_minor);
360 obd_devs[result->obd_minor] = NULL;
361 result->obd_name[0]='\0';
363 result = ERR_PTR(-EEXIST);
366 if (!result && !obd) {
368 result->obd_minor = i;
370 result->obd_type = type;
371 strncpy(result->obd_name, name,
372 sizeof(result->obd_name) - 1);
373 obd_devs[i] = result;
376 write_unlock(&obd_dev_lock);
378 if (result == NULL && i >= class_devno_max()) {
379 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
381 GOTO(out, result = ERR_PTR(-EOVERFLOW));
387 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
388 result->obd_name, result);
392 obd_device_free(newdev);
394 class_put_type(type);
398 void class_release_dev(struct obd_device *obd)
400 struct obd_type *obd_type = obd->obd_type;
402 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
403 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
404 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
405 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
406 LASSERT(obd_type != NULL);
408 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
409 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
411 write_lock(&obd_dev_lock);
412 obd_devs[obd->obd_minor] = NULL;
413 write_unlock(&obd_dev_lock);
414 obd_device_free(obd);
416 class_put_type(obd_type);
419 int class_name2dev(const char *name)
426 read_lock(&obd_dev_lock);
427 for (i = 0; i < class_devno_max(); i++) {
428 struct obd_device *obd = class_num2obd(i);
430 if (obd && strcmp(name, obd->obd_name) == 0) {
431 /* Make sure we finished attaching before we give
432 out any references */
433 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
434 if (obd->obd_attached) {
435 read_unlock(&obd_dev_lock);
441 read_unlock(&obd_dev_lock);
445 EXPORT_SYMBOL(class_name2dev);
447 struct obd_device *class_name2obd(const char *name)
449 int dev = class_name2dev(name);
451 if (dev < 0 || dev > class_devno_max())
453 return class_num2obd(dev);
455 EXPORT_SYMBOL(class_name2obd);
457 int class_uuid2dev(struct obd_uuid *uuid)
461 read_lock(&obd_dev_lock);
462 for (i = 0; i < class_devno_max(); i++) {
463 struct obd_device *obd = class_num2obd(i);
465 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
466 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
467 read_unlock(&obd_dev_lock);
471 read_unlock(&obd_dev_lock);
475 EXPORT_SYMBOL(class_uuid2dev);
477 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
479 int dev = class_uuid2dev(uuid);
482 return class_num2obd(dev);
484 EXPORT_SYMBOL(class_uuid2obd);
487 * Get obd device from ::obd_devs[]
489 * \param num [in] array index
491 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
492 * otherwise return the obd device there.
494 struct obd_device *class_num2obd(int num)
496 struct obd_device *obd = NULL;
498 if (num < class_devno_max()) {
503 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
504 "%p obd_magic %08x != %08x\n",
505 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
506 LASSERTF(obd->obd_minor == num,
507 "%p obd_minor %0d != %0d\n",
508 obd, obd->obd_minor, num);
513 EXPORT_SYMBOL(class_num2obd);
516 * Get obd devices count. Device in any
518 * \retval obd device count
520 int get_devices_count(void)
522 int index, max_index = class_devno_max(), dev_count = 0;
524 read_lock(&obd_dev_lock);
525 for (index = 0; index <= max_index; index++) {
526 struct obd_device *obd = class_num2obd(index);
530 read_unlock(&obd_dev_lock);
534 EXPORT_SYMBOL(get_devices_count);
536 void class_obd_list(void)
541 read_lock(&obd_dev_lock);
542 for (i = 0; i < class_devno_max(); i++) {
543 struct obd_device *obd = class_num2obd(i);
547 if (obd->obd_stopping)
549 else if (obd->obd_set_up)
551 else if (obd->obd_attached)
555 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
556 i, status, obd->obd_type->typ_name,
557 obd->obd_name, obd->obd_uuid.uuid,
558 atomic_read(&obd->obd_refcount));
560 read_unlock(&obd_dev_lock);
564 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
565 specified, then only the client with that uuid is returned,
566 otherwise any client connected to the tgt is returned. */
567 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
568 const char * typ_name,
569 struct obd_uuid *grp_uuid)
573 read_lock(&obd_dev_lock);
574 for (i = 0; i < class_devno_max(); i++) {
575 struct obd_device *obd = class_num2obd(i);
579 if ((strncmp(obd->obd_type->typ_name, typ_name,
580 strlen(typ_name)) == 0)) {
581 if (obd_uuid_equals(tgt_uuid,
582 &obd->u.cli.cl_target_uuid) &&
583 ((grp_uuid)? obd_uuid_equals(grp_uuid,
584 &obd->obd_uuid) : 1)) {
585 read_unlock(&obd_dev_lock);
590 read_unlock(&obd_dev_lock);
594 EXPORT_SYMBOL(class_find_client_obd);
596 /* Iterate the obd_device list looking devices have grp_uuid. Start
597 searching at *next, and if a device is found, the next index to look
598 at is saved in *next. If next is NULL, then the first matching device
599 will always be returned. */
600 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
606 else if (*next >= 0 && *next < class_devno_max())
611 read_lock(&obd_dev_lock);
612 for (; i < class_devno_max(); i++) {
613 struct obd_device *obd = class_num2obd(i);
617 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
620 read_unlock(&obd_dev_lock);
624 read_unlock(&obd_dev_lock);
628 EXPORT_SYMBOL(class_devices_in_group);
631 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
632 * adjust sptlrpc settings accordingly.
634 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
636 struct obd_device *obd;
640 LASSERT(namelen > 0);
642 read_lock(&obd_dev_lock);
643 for (i = 0; i < class_devno_max(); i++) {
644 obd = class_num2obd(i);
646 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
649 /* only notify mdc, osc, mdt, ost */
650 type = obd->obd_type->typ_name;
651 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
652 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
653 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
654 strcmp(type, LUSTRE_OST_NAME) != 0)
657 if (strncmp(obd->obd_name, fsname, namelen))
660 class_incref(obd, __FUNCTION__, obd);
661 read_unlock(&obd_dev_lock);
662 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
663 sizeof(KEY_SPTLRPC_CONF),
664 KEY_SPTLRPC_CONF, 0, NULL, NULL);
666 class_decref(obd, __FUNCTION__, obd);
667 read_lock(&obd_dev_lock);
669 read_unlock(&obd_dev_lock);
672 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
674 void obd_cleanup_caches(void)
677 if (obd_device_cachep) {
678 kmem_cache_destroy(obd_device_cachep);
679 obd_device_cachep = NULL;
682 kmem_cache_destroy(obdo_cachep);
686 kmem_cache_destroy(import_cachep);
687 import_cachep = NULL;
690 kmem_cache_destroy(capa_cachep);
696 int obd_init_caches(void)
701 LASSERT(obd_device_cachep == NULL);
702 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
703 sizeof(struct obd_device),
705 if (!obd_device_cachep)
706 GOTO(out, rc = -ENOMEM);
708 LASSERT(obdo_cachep == NULL);
709 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
712 GOTO(out, rc = -ENOMEM);
714 LASSERT(import_cachep == NULL);
715 import_cachep = kmem_cache_create("ll_import_cache",
716 sizeof(struct obd_import),
719 GOTO(out, rc = -ENOMEM);
721 LASSERT(capa_cachep == NULL);
722 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
725 GOTO(out, rc = -ENOMEM);
729 obd_cleanup_caches();
733 /* map connection to client */
734 struct obd_export *class_conn2export(struct lustre_handle *conn)
736 struct obd_export *export;
740 CDEBUG(D_CACHE, "looking for null handle\n");
744 if (conn->cookie == -1) { /* this means assign a new connection */
745 CDEBUG(D_CACHE, "want a new connection\n");
749 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
750 export = class_handle2object(conn->cookie, NULL);
753 EXPORT_SYMBOL(class_conn2export);
755 struct obd_device *class_exp2obd(struct obd_export *exp)
761 EXPORT_SYMBOL(class_exp2obd);
763 struct obd_device *class_conn2obd(struct lustre_handle *conn)
765 struct obd_export *export;
766 export = class_conn2export(conn);
768 struct obd_device *obd = export->exp_obd;
769 class_export_put(export);
774 EXPORT_SYMBOL(class_conn2obd);
776 struct obd_import *class_exp2cliimp(struct obd_export *exp)
778 struct obd_device *obd = exp->exp_obd;
781 return obd->u.cli.cl_import;
783 EXPORT_SYMBOL(class_exp2cliimp);
785 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
787 struct obd_device *obd = class_conn2obd(conn);
790 return obd->u.cli.cl_import;
792 EXPORT_SYMBOL(class_conn2cliimp);
794 /* Export management functions */
795 static void class_export_destroy(struct obd_export *exp)
797 struct obd_device *obd = exp->exp_obd;
800 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
801 LASSERT(obd != NULL);
803 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
804 exp->exp_client_uuid.uuid, obd->obd_name);
806 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
807 if (exp->exp_connection)
808 ptlrpc_put_connection_superhack(exp->exp_connection);
810 LASSERT(list_empty(&exp->exp_outstanding_replies));
811 LASSERT(list_empty(&exp->exp_uncommitted_replies));
812 LASSERT(list_empty(&exp->exp_req_replay_queue));
813 LASSERT(list_empty(&exp->exp_hp_rpcs));
814 obd_destroy_export(exp);
815 class_decref(obd, "export", exp);
817 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
821 static void export_handle_addref(void *export)
823 class_export_get(export);
826 static struct portals_handle_ops export_handle_ops = {
827 .hop_addref = export_handle_addref,
831 struct obd_export *class_export_get(struct obd_export *exp)
833 atomic_inc(&exp->exp_refcount);
834 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
835 atomic_read(&exp->exp_refcount));
838 EXPORT_SYMBOL(class_export_get);
840 void class_export_put(struct obd_export *exp)
842 LASSERT(exp != NULL);
843 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
844 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
845 atomic_read(&exp->exp_refcount) - 1);
847 if (atomic_dec_and_test(&exp->exp_refcount)) {
848 LASSERT(!list_empty(&exp->exp_obd_chain));
849 CDEBUG(D_IOCTL, "final put %p/%s\n",
850 exp, exp->exp_client_uuid.uuid);
852 /* release nid stat refererence */
853 lprocfs_exp_cleanup(exp);
855 obd_zombie_export_add(exp);
858 EXPORT_SYMBOL(class_export_put);
860 /* Creates a new export, adds it to the hash table, and returns a
861 * pointer to it. The refcount is 2: one for the hash reference, and
862 * one for the pointer returned by this function. */
863 struct obd_export *class_new_export(struct obd_device *obd,
864 struct obd_uuid *cluuid)
866 struct obd_export *export;
867 cfs_hash_t *hash = NULL;
871 OBD_ALLOC_PTR(export);
873 return ERR_PTR(-ENOMEM);
875 export->exp_conn_cnt = 0;
876 export->exp_lock_hash = NULL;
877 export->exp_flock_hash = NULL;
878 atomic_set(&export->exp_refcount, 2);
879 atomic_set(&export->exp_rpc_count, 0);
880 atomic_set(&export->exp_cb_count, 0);
881 atomic_set(&export->exp_locks_count, 0);
882 #if LUSTRE_TRACKS_LOCK_EXP_REFS
883 INIT_LIST_HEAD(&export->exp_locks_list);
884 spin_lock_init(&export->exp_locks_list_guard);
886 atomic_set(&export->exp_replay_count, 0);
887 export->exp_obd = obd;
888 INIT_LIST_HEAD(&export->exp_outstanding_replies);
889 spin_lock_init(&export->exp_uncommitted_replies_lock);
890 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
891 INIT_LIST_HEAD(&export->exp_req_replay_queue);
892 INIT_LIST_HEAD(&export->exp_handle.h_link);
893 INIT_LIST_HEAD(&export->exp_hp_rpcs);
894 INIT_LIST_HEAD(&export->exp_reg_rpcs);
895 class_handle_hash(&export->exp_handle, &export_handle_ops);
896 export->exp_last_request_time = cfs_time_current_sec();
897 spin_lock_init(&export->exp_lock);
898 spin_lock_init(&export->exp_rpc_lock);
899 INIT_HLIST_NODE(&export->exp_uuid_hash);
900 INIT_HLIST_NODE(&export->exp_nid_hash);
901 spin_lock_init(&export->exp_bl_list_lock);
902 INIT_LIST_HEAD(&export->exp_bl_list);
904 export->exp_sp_peer = LUSTRE_SP_ANY;
905 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
906 export->exp_client_uuid = *cluuid;
907 obd_init_export(export);
909 spin_lock(&obd->obd_dev_lock);
910 /* shouldn't happen, but might race */
911 if (obd->obd_stopping)
912 GOTO(exit_unlock, rc = -ENODEV);
914 hash = cfs_hash_getref(obd->obd_uuid_hash);
916 GOTO(exit_unlock, rc = -ENODEV);
917 spin_unlock(&obd->obd_dev_lock);
919 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
920 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
922 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
923 obd->obd_name, cluuid->uuid, rc);
924 GOTO(exit_err, rc = -EALREADY);
928 spin_lock(&obd->obd_dev_lock);
929 if (obd->obd_stopping) {
930 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
931 GOTO(exit_unlock, rc = -ENODEV);
934 class_incref(obd, "export", export);
935 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
936 list_add_tail(&export->exp_obd_chain_timed,
937 &export->exp_obd->obd_exports_timed);
938 export->exp_obd->obd_num_exports++;
939 spin_unlock(&obd->obd_dev_lock);
940 cfs_hash_putref(hash);
944 spin_unlock(&obd->obd_dev_lock);
947 cfs_hash_putref(hash);
948 class_handle_unhash(&export->exp_handle);
949 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
950 obd_destroy_export(export);
951 OBD_FREE_PTR(export);
954 EXPORT_SYMBOL(class_new_export);
956 void class_unlink_export(struct obd_export *exp)
958 class_handle_unhash(&exp->exp_handle);
960 spin_lock(&exp->exp_obd->obd_dev_lock);
961 /* delete an uuid-export hashitem from hashtables */
962 if (!hlist_unhashed(&exp->exp_uuid_hash))
963 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
964 &exp->exp_client_uuid,
965 &exp->exp_uuid_hash);
967 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
968 list_del_init(&exp->exp_obd_chain_timed);
969 exp->exp_obd->obd_num_exports--;
970 spin_unlock(&exp->exp_obd->obd_dev_lock);
971 class_export_put(exp);
973 EXPORT_SYMBOL(class_unlink_export);
975 /* Import management functions */
976 void class_import_destroy(struct obd_import *imp)
980 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
981 imp->imp_obd->obd_name);
983 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
985 ptlrpc_put_connection_superhack(imp->imp_connection);
987 while (!list_empty(&imp->imp_conn_list)) {
988 struct obd_import_conn *imp_conn;
990 imp_conn = list_entry(imp->imp_conn_list.next,
991 struct obd_import_conn, oic_item);
992 list_del_init(&imp_conn->oic_item);
993 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
994 OBD_FREE(imp_conn, sizeof(*imp_conn));
997 LASSERT(imp->imp_sec == NULL);
998 class_decref(imp->imp_obd, "import", imp);
999 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1003 static void import_handle_addref(void *import)
1005 class_import_get(import);
1008 static struct portals_handle_ops import_handle_ops = {
1009 .hop_addref = import_handle_addref,
1013 struct obd_import *class_import_get(struct obd_import *import)
1015 atomic_inc(&import->imp_refcount);
1016 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1017 atomic_read(&import->imp_refcount),
1018 import->imp_obd->obd_name);
1021 EXPORT_SYMBOL(class_import_get);
1023 void class_import_put(struct obd_import *imp)
1027 LASSERT(list_empty(&imp->imp_zombie_chain));
1028 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1030 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1031 atomic_read(&imp->imp_refcount) - 1,
1032 imp->imp_obd->obd_name);
1034 if (atomic_dec_and_test(&imp->imp_refcount)) {
1035 CDEBUG(D_INFO, "final put import %p\n", imp);
1036 obd_zombie_import_add(imp);
1039 /* catch possible import put race */
1040 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1043 EXPORT_SYMBOL(class_import_put);
1045 static void init_imp_at(struct imp_at *at) {
1047 at_init(&at->iat_net_latency, 0, 0);
1048 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1049 /* max service estimates are tracked on the server side, so
1050 don't use the AT history here, just use the last reported
1051 val. (But keep hist for proc histogram, worst_ever) */
1052 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1057 struct obd_import *class_new_import(struct obd_device *obd)
1059 struct obd_import *imp;
1061 OBD_ALLOC(imp, sizeof(*imp));
1065 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1066 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1067 INIT_LIST_HEAD(&imp->imp_replay_list);
1068 INIT_LIST_HEAD(&imp->imp_sending_list);
1069 INIT_LIST_HEAD(&imp->imp_delayed_list);
1070 INIT_LIST_HEAD(&imp->imp_committed_list);
1071 imp->imp_replay_cursor = &imp->imp_committed_list;
1072 spin_lock_init(&imp->imp_lock);
1073 imp->imp_last_success_conn = 0;
1074 imp->imp_state = LUSTRE_IMP_NEW;
1075 imp->imp_obd = class_incref(obd, "import", imp);
1076 mutex_init(&imp->imp_sec_mutex);
1077 init_waitqueue_head(&imp->imp_recovery_waitq);
1079 atomic_set(&imp->imp_refcount, 2);
1080 atomic_set(&imp->imp_unregistering, 0);
1081 atomic_set(&imp->imp_inflight, 0);
1082 atomic_set(&imp->imp_replay_inflight, 0);
1083 atomic_set(&imp->imp_inval_count, 0);
1084 INIT_LIST_HEAD(&imp->imp_conn_list);
1085 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1086 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1087 init_imp_at(&imp->imp_at);
1089 /* the default magic is V2, will be used in connect RPC, and
1090 * then adjusted according to the flags in request/reply. */
1091 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1095 EXPORT_SYMBOL(class_new_import);
1097 void class_destroy_import(struct obd_import *import)
1099 LASSERT(import != NULL);
1100 LASSERT(import != LP_POISON);
1102 class_handle_unhash(&import->imp_handle);
1104 spin_lock(&import->imp_lock);
1105 import->imp_generation++;
1106 spin_unlock(&import->imp_lock);
1107 class_import_put(import);
1109 EXPORT_SYMBOL(class_destroy_import);
1111 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1113 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1115 spin_lock(&exp->exp_locks_list_guard);
1117 LASSERT(lock->l_exp_refs_nr >= 0);
1119 if (lock->l_exp_refs_target != NULL &&
1120 lock->l_exp_refs_target != exp) {
1121 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1122 exp, lock, lock->l_exp_refs_target);
1124 if ((lock->l_exp_refs_nr ++) == 0) {
1125 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1126 lock->l_exp_refs_target = exp;
1128 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1129 lock, exp, lock->l_exp_refs_nr);
1130 spin_unlock(&exp->exp_locks_list_guard);
1132 EXPORT_SYMBOL(__class_export_add_lock_ref);
1134 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1136 spin_lock(&exp->exp_locks_list_guard);
1137 LASSERT(lock->l_exp_refs_nr > 0);
1138 if (lock->l_exp_refs_target != exp) {
1139 LCONSOLE_WARN("lock %p, "
1140 "mismatching export pointers: %p, %p\n",
1141 lock, lock->l_exp_refs_target, exp);
1143 if (-- lock->l_exp_refs_nr == 0) {
1144 list_del_init(&lock->l_exp_refs_link);
1145 lock->l_exp_refs_target = NULL;
1147 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1148 lock, exp, lock->l_exp_refs_nr);
1149 spin_unlock(&exp->exp_locks_list_guard);
1151 EXPORT_SYMBOL(__class_export_del_lock_ref);
1154 /* A connection defines an export context in which preallocation can
1155 be managed. This releases the export pointer reference, and returns
1156 the export handle, so the export refcount is 1 when this function
1158 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1159 struct obd_uuid *cluuid)
1161 struct obd_export *export;
1162 LASSERT(conn != NULL);
1163 LASSERT(obd != NULL);
1164 LASSERT(cluuid != NULL);
1167 export = class_new_export(obd, cluuid);
1169 RETURN(PTR_ERR(export));
1171 conn->cookie = export->exp_handle.h_cookie;
1172 class_export_put(export);
1174 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1175 cluuid->uuid, conn->cookie);
1178 EXPORT_SYMBOL(class_connect);
1180 /* if export is involved in recovery then clean up related things */
1181 void class_export_recovery_cleanup(struct obd_export *exp)
1183 struct obd_device *obd = exp->exp_obd;
1185 spin_lock(&obd->obd_recovery_task_lock);
1186 if (obd->obd_recovering) {
1187 if (exp->exp_in_recovery) {
1188 spin_lock(&exp->exp_lock);
1189 exp->exp_in_recovery = 0;
1190 spin_unlock(&exp->exp_lock);
1191 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1192 atomic_dec(&obd->obd_connected_clients);
1195 /* if called during recovery then should update
1196 * obd_stale_clients counter,
1197 * lightweight exports are not counted */
1198 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1199 exp->exp_obd->obd_stale_clients++;
1201 spin_unlock(&obd->obd_recovery_task_lock);
1203 spin_lock(&exp->exp_lock);
1204 /** Cleanup req replay fields */
1205 if (exp->exp_req_replay_needed) {
1206 exp->exp_req_replay_needed = 0;
1208 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1209 atomic_dec(&obd->obd_req_replay_clients);
1212 /** Cleanup lock replay data */
1213 if (exp->exp_lock_replay_needed) {
1214 exp->exp_lock_replay_needed = 0;
1216 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1217 atomic_dec(&obd->obd_lock_replay_clients);
1219 spin_unlock(&exp->exp_lock);
1222 /* This function removes 1-3 references from the export:
1223 * 1 - for export pointer passed
1224 * and if disconnect really need
1225 * 2 - removing from hash
1226 * 3 - in client_unlink_export
1227 * The export pointer passed to this function can destroyed */
1228 int class_disconnect(struct obd_export *export)
1230 int already_disconnected;
1233 if (export == NULL) {
1234 CWARN("attempting to free NULL export %p\n", export);
1238 spin_lock(&export->exp_lock);
1239 already_disconnected = export->exp_disconnected;
1240 export->exp_disconnected = 1;
1241 spin_unlock(&export->exp_lock);
1243 /* class_cleanup(), abort_recovery(), and class_fail_export()
1244 * all end up in here, and if any of them race we shouldn't
1245 * call extra class_export_puts(). */
1246 if (already_disconnected) {
1247 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1248 GOTO(no_disconn, already_disconnected);
1251 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1252 export->exp_handle.h_cookie);
1254 if (!hlist_unhashed(&export->exp_nid_hash))
1255 cfs_hash_del(export->exp_obd->obd_nid_hash,
1256 &export->exp_connection->c_peer.nid,
1257 &export->exp_nid_hash);
1259 class_export_recovery_cleanup(export);
1260 class_unlink_export(export);
1262 class_export_put(export);
1265 EXPORT_SYMBOL(class_disconnect);
1267 /* Return non-zero for a fully connected export */
1268 int class_connected_export(struct obd_export *exp)
1273 spin_lock(&exp->exp_lock);
1274 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1275 spin_unlock(&exp->exp_lock);
1279 EXPORT_SYMBOL(class_connected_export);
1281 static void class_disconnect_export_list(struct list_head *list,
1282 enum obd_option flags)
1285 struct obd_export *exp;
1288 /* It's possible that an export may disconnect itself, but
1289 * nothing else will be added to this list. */
1290 while (!list_empty(list)) {
1291 exp = list_entry(list->next, struct obd_export,
1293 /* need for safe call CDEBUG after obd_disconnect */
1294 class_export_get(exp);
1296 spin_lock(&exp->exp_lock);
1297 exp->exp_flags = flags;
1298 spin_unlock(&exp->exp_lock);
1300 if (obd_uuid_equals(&exp->exp_client_uuid,
1301 &exp->exp_obd->obd_uuid)) {
1303 "exp %p export uuid == obd uuid, don't discon\n",
1305 /* Need to delete this now so we don't end up pointing
1306 * to work_list later when this export is cleaned up. */
1307 list_del_init(&exp->exp_obd_chain);
1308 class_export_put(exp);
1312 class_export_get(exp);
1313 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1314 "last request at "CFS_TIME_T"\n",
1315 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1316 exp, exp->exp_last_request_time);
1317 /* release one export reference anyway */
1318 rc = obd_disconnect(exp);
1320 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1321 obd_export_nid2str(exp), exp, rc);
1322 class_export_put(exp);
1327 void class_disconnect_exports(struct obd_device *obd)
1329 struct list_head work_list;
1332 /* Move all of the exports from obd_exports to a work list, en masse. */
1333 INIT_LIST_HEAD(&work_list);
1334 spin_lock(&obd->obd_dev_lock);
1335 list_splice_init(&obd->obd_exports, &work_list);
1336 list_splice_init(&obd->obd_delayed_exports, &work_list);
1337 spin_unlock(&obd->obd_dev_lock);
1339 if (!list_empty(&work_list)) {
1340 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1341 "disconnecting them\n", obd->obd_minor, obd);
1342 class_disconnect_export_list(&work_list,
1343 exp_flags_from_obd(obd));
1345 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1346 obd->obd_minor, obd);
1349 EXPORT_SYMBOL(class_disconnect_exports);
1351 /* Remove exports that have not completed recovery.
1353 void class_disconnect_stale_exports(struct obd_device *obd,
1354 int (*test_export)(struct obd_export *))
1356 struct list_head work_list;
1357 struct obd_export *exp, *n;
1361 INIT_LIST_HEAD(&work_list);
1362 spin_lock(&obd->obd_dev_lock);
1363 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1365 /* don't count self-export as client */
1366 if (obd_uuid_equals(&exp->exp_client_uuid,
1367 &exp->exp_obd->obd_uuid))
1370 /* don't evict clients which have no slot in last_rcvd
1371 * (e.g. lightweight connection) */
1372 if (exp->exp_target_data.ted_lr_idx == -1)
1375 spin_lock(&exp->exp_lock);
1376 if (exp->exp_failed || test_export(exp)) {
1377 spin_unlock(&exp->exp_lock);
1380 exp->exp_failed = 1;
1381 spin_unlock(&exp->exp_lock);
1383 list_move(&exp->exp_obd_chain, &work_list);
1385 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1386 obd->obd_name, exp->exp_client_uuid.uuid,
1387 exp->exp_connection == NULL ? "<unknown>" :
1388 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1389 print_export_data(exp, "EVICTING", 0);
1391 spin_unlock(&obd->obd_dev_lock);
1394 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1395 obd->obd_name, evicted);
1397 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1398 OBD_OPT_ABORT_RECOV);
1401 EXPORT_SYMBOL(class_disconnect_stale_exports);
1403 void class_fail_export(struct obd_export *exp)
1405 int rc, already_failed;
1407 spin_lock(&exp->exp_lock);
1408 already_failed = exp->exp_failed;
1409 exp->exp_failed = 1;
1410 spin_unlock(&exp->exp_lock);
1412 if (already_failed) {
1413 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1414 exp, exp->exp_client_uuid.uuid);
1418 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1419 exp, exp->exp_client_uuid.uuid);
1421 if (obd_dump_on_timeout)
1422 libcfs_debug_dumplog();
1424 /* need for safe call CDEBUG after obd_disconnect */
1425 class_export_get(exp);
1427 /* Most callers into obd_disconnect are removing their own reference
1428 * (request, for example) in addition to the one from the hash table.
1429 * We don't have such a reference here, so make one. */
1430 class_export_get(exp);
1431 rc = obd_disconnect(exp);
1433 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1435 CDEBUG(D_HA, "disconnected export %p/%s\n",
1436 exp, exp->exp_client_uuid.uuid);
1437 class_export_put(exp);
1439 EXPORT_SYMBOL(class_fail_export);
1441 char *obd_export_nid2str(struct obd_export *exp)
1443 if (exp->exp_connection != NULL)
1444 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1448 EXPORT_SYMBOL(obd_export_nid2str);
1450 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1452 cfs_hash_t *nid_hash;
1453 struct obd_export *doomed_exp = NULL;
1454 int exports_evicted = 0;
1456 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1458 spin_lock(&obd->obd_dev_lock);
1459 /* umount has run already, so evict thread should leave
1460 * its task to umount thread now */
1461 if (obd->obd_stopping) {
1462 spin_unlock(&obd->obd_dev_lock);
1463 return exports_evicted;
1465 nid_hash = obd->obd_nid_hash;
1466 cfs_hash_getref(nid_hash);
1467 spin_unlock(&obd->obd_dev_lock);
1470 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1471 if (doomed_exp == NULL)
1474 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1475 "nid %s found, wanted nid %s, requested nid %s\n",
1476 obd_export_nid2str(doomed_exp),
1477 libcfs_nid2str(nid_key), nid);
1478 LASSERTF(doomed_exp != obd->obd_self_export,
1479 "self-export is hashed by NID?\n");
1481 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1482 "request\n", obd->obd_name,
1483 obd_uuid2str(&doomed_exp->exp_client_uuid),
1484 obd_export_nid2str(doomed_exp));
1485 class_fail_export(doomed_exp);
1486 class_export_put(doomed_exp);
1489 cfs_hash_putref(nid_hash);
1491 if (!exports_evicted)
1492 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1493 obd->obd_name, nid);
1494 return exports_evicted;
1496 EXPORT_SYMBOL(obd_export_evict_by_nid);
1498 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1500 cfs_hash_t *uuid_hash;
1501 struct obd_export *doomed_exp = NULL;
1502 struct obd_uuid doomed_uuid;
1503 int exports_evicted = 0;
1505 spin_lock(&obd->obd_dev_lock);
1506 if (obd->obd_stopping) {
1507 spin_unlock(&obd->obd_dev_lock);
1508 return exports_evicted;
1510 uuid_hash = obd->obd_uuid_hash;
1511 cfs_hash_getref(uuid_hash);
1512 spin_unlock(&obd->obd_dev_lock);
1514 obd_str2uuid(&doomed_uuid, uuid);
1515 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1516 CERROR("%s: can't evict myself\n", obd->obd_name);
1517 cfs_hash_putref(uuid_hash);
1518 return exports_evicted;
1521 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1523 if (doomed_exp == NULL) {
1524 CERROR("%s: can't disconnect %s: no exports found\n",
1525 obd->obd_name, uuid);
1527 CWARN("%s: evicting %s at adminstrative request\n",
1528 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1529 class_fail_export(doomed_exp);
1530 class_export_put(doomed_exp);
1533 cfs_hash_putref(uuid_hash);
1535 return exports_evicted;
1537 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1539 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1540 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1541 EXPORT_SYMBOL(class_export_dump_hook);
1544 static void print_export_data(struct obd_export *exp, const char *status,
1547 struct ptlrpc_reply_state *rs;
1548 struct ptlrpc_reply_state *first_reply = NULL;
1551 spin_lock(&exp->exp_lock);
1552 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1558 spin_unlock(&exp->exp_lock);
1560 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1561 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1562 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1563 atomic_read(&exp->exp_rpc_count),
1564 atomic_read(&exp->exp_cb_count),
1565 atomic_read(&exp->exp_locks_count),
1566 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1567 nreplies, first_reply, nreplies > 3 ? "..." : "",
1568 exp->exp_last_committed);
1569 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1570 if (locks && class_export_dump_hook != NULL)
1571 class_export_dump_hook(exp);
1575 void dump_exports(struct obd_device *obd, int locks)
1577 struct obd_export *exp;
1579 spin_lock(&obd->obd_dev_lock);
1580 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1581 print_export_data(exp, "ACTIVE", locks);
1582 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1583 print_export_data(exp, "UNLINKED", locks);
1584 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1585 print_export_data(exp, "DELAYED", locks);
1586 spin_unlock(&obd->obd_dev_lock);
1587 spin_lock(&obd_zombie_impexp_lock);
1588 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1589 print_export_data(exp, "ZOMBIE", locks);
1590 spin_unlock(&obd_zombie_impexp_lock);
1592 EXPORT_SYMBOL(dump_exports);
1594 void obd_exports_barrier(struct obd_device *obd)
1597 LASSERT(list_empty(&obd->obd_exports));
1598 spin_lock(&obd->obd_dev_lock);
1599 while (!list_empty(&obd->obd_unlinked_exports)) {
1600 spin_unlock(&obd->obd_dev_lock);
1601 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1602 cfs_time_seconds(waited));
1603 if (waited > 5 && IS_PO2(waited)) {
1604 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1605 "more than %d seconds. "
1606 "The obd refcount = %d. Is it stuck?\n",
1607 obd->obd_name, waited,
1608 atomic_read(&obd->obd_refcount));
1609 dump_exports(obd, 1);
1612 spin_lock(&obd->obd_dev_lock);
1614 spin_unlock(&obd->obd_dev_lock);
1616 EXPORT_SYMBOL(obd_exports_barrier);
1618 /* Total amount of zombies to be destroyed */
1619 static int zombies_count = 0;
1622 * kill zombie imports and exports
1624 void obd_zombie_impexp_cull(void)
1626 struct obd_import *import;
1627 struct obd_export *export;
1631 spin_lock(&obd_zombie_impexp_lock);
1634 if (!list_empty(&obd_zombie_imports)) {
1635 import = list_entry(obd_zombie_imports.next,
1638 list_del_init(&import->imp_zombie_chain);
1642 if (!list_empty(&obd_zombie_exports)) {
1643 export = list_entry(obd_zombie_exports.next,
1646 list_del_init(&export->exp_obd_chain);
1649 spin_unlock(&obd_zombie_impexp_lock);
1651 if (import != NULL) {
1652 class_import_destroy(import);
1653 spin_lock(&obd_zombie_impexp_lock);
1655 spin_unlock(&obd_zombie_impexp_lock);
1658 if (export != NULL) {
1659 class_export_destroy(export);
1660 spin_lock(&obd_zombie_impexp_lock);
1662 spin_unlock(&obd_zombie_impexp_lock);
1666 } while (import != NULL || export != NULL);
1670 static struct completion obd_zombie_start;
1671 static struct completion obd_zombie_stop;
1672 static unsigned long obd_zombie_flags;
1673 static wait_queue_head_t obd_zombie_waitq;
1674 static pid_t obd_zombie_pid;
1677 OBD_ZOMBIE_STOP = 0x0001,
1681 * check for work for kill zombie import/export thread.
1683 static int obd_zombie_impexp_check(void *arg)
1687 spin_lock(&obd_zombie_impexp_lock);
1688 rc = (zombies_count == 0) &&
1689 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1690 spin_unlock(&obd_zombie_impexp_lock);
1696 * Add export to the obd_zombe thread and notify it.
1698 static void obd_zombie_export_add(struct obd_export *exp) {
1699 spin_lock(&exp->exp_obd->obd_dev_lock);
1700 LASSERT(!list_empty(&exp->exp_obd_chain));
1701 list_del_init(&exp->exp_obd_chain);
1702 spin_unlock(&exp->exp_obd->obd_dev_lock);
1703 spin_lock(&obd_zombie_impexp_lock);
1705 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1706 spin_unlock(&obd_zombie_impexp_lock);
1708 obd_zombie_impexp_notify();
1712 * Add import to the obd_zombe thread and notify it.
1714 static void obd_zombie_import_add(struct obd_import *imp) {
1715 LASSERT(imp->imp_sec == NULL);
1716 LASSERT(imp->imp_rq_pool == NULL);
1717 spin_lock(&obd_zombie_impexp_lock);
1718 LASSERT(list_empty(&imp->imp_zombie_chain));
1720 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1721 spin_unlock(&obd_zombie_impexp_lock);
1723 obd_zombie_impexp_notify();
1727 * notify import/export destroy thread about new zombie.
1729 static void obd_zombie_impexp_notify(void)
1732 * Make sure obd_zomebie_impexp_thread get this notification.
1733 * It is possible this signal only get by obd_zombie_barrier, and
1734 * barrier gulps this notification and sleeps away and hangs ensues
1736 wake_up_all(&obd_zombie_waitq);
1740 * check whether obd_zombie is idle
1742 static int obd_zombie_is_idle(void)
1746 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1747 spin_lock(&obd_zombie_impexp_lock);
1748 rc = (zombies_count == 0);
1749 spin_unlock(&obd_zombie_impexp_lock);
1754 * wait when obd_zombie import/export queues become empty
1756 void obd_zombie_barrier(void)
1758 struct l_wait_info lwi = { 0 };
1760 if (obd_zombie_pid == current_pid())
1761 /* don't wait for myself */
1763 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1765 EXPORT_SYMBOL(obd_zombie_barrier);
1769 * destroy zombie export/import thread.
1771 static int obd_zombie_impexp_thread(void *unused)
1773 unshare_fs_struct();
1774 complete(&obd_zombie_start);
1776 obd_zombie_pid = current_pid();
1778 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1779 struct l_wait_info lwi = { 0 };
1781 l_wait_event(obd_zombie_waitq,
1782 !obd_zombie_impexp_check(NULL), &lwi);
1783 obd_zombie_impexp_cull();
1786 * Notify obd_zombie_barrier callers that queues
1789 wake_up(&obd_zombie_waitq);
1792 complete(&obd_zombie_stop);
1799 * start destroy zombie import/export thread
1801 int obd_zombie_impexp_init(void)
1803 struct task_struct *task;
1805 INIT_LIST_HEAD(&obd_zombie_imports);
1807 INIT_LIST_HEAD(&obd_zombie_exports);
1808 spin_lock_init(&obd_zombie_impexp_lock);
1809 init_completion(&obd_zombie_start);
1810 init_completion(&obd_zombie_stop);
1811 init_waitqueue_head(&obd_zombie_waitq);
1814 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1816 RETURN(PTR_ERR(task));
1818 wait_for_completion(&obd_zombie_start);
1822 * stop destroy zombie import/export thread
1824 void obd_zombie_impexp_stop(void)
1826 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1827 obd_zombie_impexp_notify();
1828 wait_for_completion(&obd_zombie_stop);
1831 /***** Kernel-userspace comm helpers *******/
1833 /* Get length of entire message, including header */
1834 int kuc_len(int payload_len)
1836 return sizeof(struct kuc_hdr) + payload_len;
1838 EXPORT_SYMBOL(kuc_len);
1840 /* Get a pointer to kuc header, given a ptr to the payload
1841 * @param p Pointer to payload area
1842 * @returns Pointer to kuc header
1844 struct kuc_hdr * kuc_ptr(void *p)
1846 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1847 LASSERT(lh->kuc_magic == KUC_MAGIC);
1850 EXPORT_SYMBOL(kuc_ptr);
1852 /* Test if payload is part of kuc message
1853 * @param p Pointer to payload area
1856 int kuc_ispayload(void *p)
1858 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1860 if (kh->kuc_magic == KUC_MAGIC)
1865 EXPORT_SYMBOL(kuc_ispayload);
1867 /* Alloc space for a message, and fill in header
1868 * @return Pointer to payload area
1870 void *kuc_alloc(int payload_len, int transport, int type)
1873 int len = kuc_len(payload_len);
1877 return ERR_PTR(-ENOMEM);
1879 lh->kuc_magic = KUC_MAGIC;
1880 lh->kuc_transport = transport;
1881 lh->kuc_msgtype = type;
1882 lh->kuc_msglen = len;
1884 return (void *)(lh + 1);
1886 EXPORT_SYMBOL(kuc_alloc);
1888 /* Takes pointer to payload area */
1889 inline void kuc_free(void *p, int payload_len)
1891 struct kuc_hdr *lh = kuc_ptr(p);
1892 OBD_FREE(lh, kuc_len(payload_len));
1894 EXPORT_SYMBOL(kuc_free);
1896 struct obd_request_slot_waiter {
1897 struct list_head orsw_entry;
1898 wait_queue_head_t orsw_waitq;
1902 static bool obd_request_slot_avail(struct client_obd *cli,
1903 struct obd_request_slot_waiter *orsw)
1907 client_obd_list_lock(&cli->cl_loi_list_lock);
1908 avail = !!list_empty(&orsw->orsw_entry);
1909 client_obd_list_unlock(&cli->cl_loi_list_lock);
1915 * For network flow control, the RPC sponsor needs to acquire a credit
1916 * before sending the RPC. The credits count for a connection is defined
1917 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1918 * the subsequent RPC sponsors need to wait until others released their
1919 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1921 int obd_get_request_slot(struct client_obd *cli)
1923 struct obd_request_slot_waiter orsw;
1924 struct l_wait_info lwi;
1927 client_obd_list_lock(&cli->cl_loi_list_lock);
1928 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1929 cli->cl_r_in_flight++;
1930 client_obd_list_unlock(&cli->cl_loi_list_lock);
1934 init_waitqueue_head(&orsw.orsw_waitq);
1935 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1936 orsw.orsw_signaled = false;
1937 client_obd_list_unlock(&cli->cl_loi_list_lock);
1939 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1940 rc = l_wait_event(orsw.orsw_waitq,
1941 obd_request_slot_avail(cli, &orsw) ||
1945 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1946 * freed but other (such as obd_put_request_slot) is using it. */
1947 client_obd_list_lock(&cli->cl_loi_list_lock);
1949 if (!orsw.orsw_signaled) {
1950 if (list_empty(&orsw.orsw_entry))
1951 cli->cl_r_in_flight--;
1953 list_del(&orsw.orsw_entry);
1957 if (orsw.orsw_signaled) {
1958 LASSERT(list_empty(&orsw.orsw_entry));
1962 client_obd_list_unlock(&cli->cl_loi_list_lock);
1966 EXPORT_SYMBOL(obd_get_request_slot);
1968 void obd_put_request_slot(struct client_obd *cli)
1970 struct obd_request_slot_waiter *orsw;
1972 client_obd_list_lock(&cli->cl_loi_list_lock);
1973 cli->cl_r_in_flight--;
1975 /* If there is free slot, wakeup the first waiter. */
1976 if (!list_empty(&cli->cl_loi_read_list) &&
1977 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1978 orsw = list_entry(cli->cl_loi_read_list.next,
1979 struct obd_request_slot_waiter, orsw_entry);
1980 list_del_init(&orsw->orsw_entry);
1981 cli->cl_r_in_flight++;
1982 wake_up(&orsw->orsw_waitq);
1984 client_obd_list_unlock(&cli->cl_loi_list_lock);
1986 EXPORT_SYMBOL(obd_put_request_slot);
1988 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1990 return cli->cl_max_rpcs_in_flight;
1992 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1994 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1996 struct obd_request_slot_waiter *orsw;
2001 if (max > OBD_MAX_RIF_MAX || max < 1)
2004 client_obd_list_lock(&cli->cl_loi_list_lock);
2005 old = cli->cl_max_rpcs_in_flight;
2006 cli->cl_max_rpcs_in_flight = max;
2009 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2010 for (i = 0; i < diff; i++) {
2011 if (list_empty(&cli->cl_loi_read_list))
2014 orsw = list_entry(cli->cl_loi_read_list.next,
2015 struct obd_request_slot_waiter, orsw_entry);
2016 list_del_init(&orsw->orsw_entry);
2017 cli->cl_r_in_flight++;
2018 wake_up(&orsw->orsw_waitq);
2020 client_obd_list_unlock(&cli->cl_loi_list_lock);
2024 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);