4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
49 spinlock_t obd_types_lock;
51 struct kmem_cache *obd_device_cachep;
52 struct kmem_cache *obdo_cachep;
53 EXPORT_SYMBOL(obdo_cachep);
54 struct kmem_cache *import_cachep;
56 struct list_head obd_zombie_imports;
57 struct list_head obd_zombie_exports;
58 spinlock_t obd_zombie_impexp_lock;
59 static void obd_zombie_impexp_notify(void);
60 static void obd_zombie_export_add(struct obd_export *exp);
61 static void obd_zombie_import_add(struct obd_import *imp);
62 static void print_export_data(struct obd_export *exp,
63 const char *status, int locks);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90 obd, obd->obd_namespace, obd->obd_force);
93 lu_ref_fini(&obd->obd_reference);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 struct obd_type *class_search_type(const char *name)
99 struct list_head *tmp;
100 struct obd_type *type;
102 spin_lock(&obd_types_lock);
103 list_for_each(tmp, &obd_types) {
104 type = list_entry(tmp, struct obd_type, typ_chain);
105 if (strcmp(type->typ_name, name) == 0) {
106 spin_unlock(&obd_types_lock);
110 spin_unlock(&obd_types_lock);
113 EXPORT_SYMBOL(class_search_type);
115 struct obd_type *class_get_type(const char *name)
117 struct obd_type *type = class_search_type(name);
119 #ifdef HAVE_MODULE_LOADING_SUPPORT
121 const char *modname = name;
123 if (strcmp(modname, "obdfilter") == 0)
126 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
127 modname = LUSTRE_OSP_NAME;
129 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
130 modname = LUSTRE_MDT_NAME;
132 if (!request_module("%s", modname)) {
133 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
134 type = class_search_type(name);
136 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
142 spin_lock(&type->obd_type_lock);
144 try_module_get(type->typ_dt_ops->o_owner);
145 spin_unlock(&type->obd_type_lock);
149 EXPORT_SYMBOL(class_get_type);
151 void class_put_type(struct obd_type *type)
154 spin_lock(&type->obd_type_lock);
156 module_put(type->typ_dt_ops->o_owner);
157 spin_unlock(&type->obd_type_lock);
159 EXPORT_SYMBOL(class_put_type);
161 #define CLASS_MAX_NAME 1024
163 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
164 bool enable_proc, struct lprocfs_seq_vars *module_vars,
165 #ifndef HAVE_ONLY_PROCFS_SEQ
166 struct lprocfs_vars *vars,
168 const char *name, struct lu_device_type *ldt)
170 struct obd_type *type;
175 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
177 if (class_search_type(name)) {
178 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
183 OBD_ALLOC(type, sizeof(*type));
187 OBD_ALLOC_PTR(type->typ_dt_ops);
188 OBD_ALLOC_PTR(type->typ_md_ops);
189 OBD_ALLOC(type->typ_name, strlen(name) + 1);
191 if (type->typ_dt_ops == NULL ||
192 type->typ_md_ops == NULL ||
193 type->typ_name == NULL)
196 *(type->typ_dt_ops) = *dt_ops;
197 /* md_ops is optional */
199 *(type->typ_md_ops) = *md_ops;
200 strcpy(type->typ_name, name);
201 spin_lock_init(&type->obd_type_lock);
205 #ifndef HAVE_ONLY_PROCFS_SEQ
207 type->typ_procroot = lprocfs_register(type->typ_name,
213 type->typ_procroot = lprocfs_seq_register(type->typ_name,
217 if (IS_ERR(type->typ_procroot)) {
218 rc = PTR_ERR(type->typ_procroot);
219 type->typ_procroot = NULL;
226 rc = lu_device_type_init(ldt);
231 spin_lock(&obd_types_lock);
232 list_add(&type->typ_chain, &obd_types);
233 spin_unlock(&obd_types_lock);
238 if (type->typ_name != NULL) {
240 if (type->typ_procroot != NULL) {
241 #ifndef HAVE_ONLY_PROCFS_SEQ
242 lprocfs_try_remove_proc_entry(type->typ_name,
245 remove_proc_subtree(type->typ_name, proc_lustre_root);
249 OBD_FREE(type->typ_name, strlen(name) + 1);
251 if (type->typ_md_ops != NULL)
252 OBD_FREE_PTR(type->typ_md_ops);
253 if (type->typ_dt_ops != NULL)
254 OBD_FREE_PTR(type->typ_dt_ops);
255 OBD_FREE(type, sizeof(*type));
258 EXPORT_SYMBOL(class_register_type);
260 int class_unregister_type(const char *name)
262 struct obd_type *type = class_search_type(name);
266 CERROR("unknown obd type\n");
270 if (type->typ_refcnt) {
271 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
272 /* This is a bad situation, let's make the best of it */
273 /* Remove ops, but leave the name for debugging */
274 OBD_FREE_PTR(type->typ_dt_ops);
275 OBD_FREE_PTR(type->typ_md_ops);
279 /* we do not use type->typ_procroot as for compatibility purposes
280 * other modules can share names (i.e. lod can use lov entry). so
281 * we can't reference pointer as it can get invalided when another
282 * module removes the entry */
284 if (type->typ_procroot != NULL) {
285 #ifndef HAVE_ONLY_PROCFS_SEQ
286 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
288 remove_proc_subtree(type->typ_name, proc_lustre_root);
292 if (type->typ_procsym != NULL)
293 lprocfs_remove(&type->typ_procsym);
296 lu_device_type_fini(type->typ_lu);
298 spin_lock(&obd_types_lock);
299 list_del(&type->typ_chain);
300 spin_unlock(&obd_types_lock);
301 OBD_FREE(type->typ_name, strlen(name) + 1);
302 if (type->typ_dt_ops != NULL)
303 OBD_FREE_PTR(type->typ_dt_ops);
304 if (type->typ_md_ops != NULL)
305 OBD_FREE_PTR(type->typ_md_ops);
306 OBD_FREE(type, sizeof(*type));
308 } /* class_unregister_type */
309 EXPORT_SYMBOL(class_unregister_type);
312 * Create a new obd device.
314 * Find an empty slot in ::obd_devs[], create a new obd device in it.
316 * \param[in] type_name obd device type string.
317 * \param[in] name obd device name.
319 * \retval NULL if create fails, otherwise return the obd device
322 struct obd_device *class_newdev(const char *type_name, const char *name)
324 struct obd_device *result = NULL;
325 struct obd_device *newdev;
326 struct obd_type *type = NULL;
328 int new_obd_minor = 0;
331 if (strlen(name) >= MAX_OBD_NAME) {
332 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
333 RETURN(ERR_PTR(-EINVAL));
336 type = class_get_type(type_name);
338 CERROR("OBD: unknown type: %s\n", type_name);
339 RETURN(ERR_PTR(-ENODEV));
342 newdev = obd_device_alloc();
344 GOTO(out_type, result = ERR_PTR(-ENOMEM));
346 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
348 write_lock(&obd_dev_lock);
349 for (i = 0; i < class_devno_max(); i++) {
350 struct obd_device *obd = class_num2obd(i);
352 if (obd && (strcmp(name, obd->obd_name) == 0)) {
353 CERROR("Device %s already exists at %d, won't add\n",
356 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
357 "%p obd_magic %08x != %08x\n", result,
358 result->obd_magic, OBD_DEVICE_MAGIC);
359 LASSERTF(result->obd_minor == new_obd_minor,
360 "%p obd_minor %d != %d\n", result,
361 result->obd_minor, new_obd_minor);
363 obd_devs[result->obd_minor] = NULL;
364 result->obd_name[0]='\0';
366 result = ERR_PTR(-EEXIST);
369 if (!result && !obd) {
371 result->obd_minor = i;
373 result->obd_type = type;
374 strncpy(result->obd_name, name,
375 sizeof(result->obd_name) - 1);
376 obd_devs[i] = result;
379 write_unlock(&obd_dev_lock);
381 if (result == NULL && i >= class_devno_max()) {
382 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
384 GOTO(out, result = ERR_PTR(-EOVERFLOW));
390 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
391 result->obd_name, result);
395 obd_device_free(newdev);
397 class_put_type(type);
401 void class_release_dev(struct obd_device *obd)
403 struct obd_type *obd_type = obd->obd_type;
405 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
406 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
407 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
408 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
409 LASSERT(obd_type != NULL);
411 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
412 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
414 write_lock(&obd_dev_lock);
415 obd_devs[obd->obd_minor] = NULL;
416 write_unlock(&obd_dev_lock);
417 obd_device_free(obd);
419 class_put_type(obd_type);
422 int class_name2dev(const char *name)
429 read_lock(&obd_dev_lock);
430 for (i = 0; i < class_devno_max(); i++) {
431 struct obd_device *obd = class_num2obd(i);
433 if (obd && strcmp(name, obd->obd_name) == 0) {
434 /* Make sure we finished attaching before we give
435 out any references */
436 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
437 if (obd->obd_attached) {
438 read_unlock(&obd_dev_lock);
444 read_unlock(&obd_dev_lock);
448 EXPORT_SYMBOL(class_name2dev);
450 struct obd_device *class_name2obd(const char *name)
452 int dev = class_name2dev(name);
454 if (dev < 0 || dev > class_devno_max())
456 return class_num2obd(dev);
458 EXPORT_SYMBOL(class_name2obd);
460 int class_uuid2dev(struct obd_uuid *uuid)
464 read_lock(&obd_dev_lock);
465 for (i = 0; i < class_devno_max(); i++) {
466 struct obd_device *obd = class_num2obd(i);
468 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
469 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
470 read_unlock(&obd_dev_lock);
474 read_unlock(&obd_dev_lock);
478 EXPORT_SYMBOL(class_uuid2dev);
480 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
482 int dev = class_uuid2dev(uuid);
485 return class_num2obd(dev);
487 EXPORT_SYMBOL(class_uuid2obd);
490 * Get obd device from ::obd_devs[]
492 * \param num [in] array index
494 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
495 * otherwise return the obd device there.
497 struct obd_device *class_num2obd(int num)
499 struct obd_device *obd = NULL;
501 if (num < class_devno_max()) {
506 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
507 "%p obd_magic %08x != %08x\n",
508 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
509 LASSERTF(obd->obd_minor == num,
510 "%p obd_minor %0d != %0d\n",
511 obd, obd->obd_minor, num);
516 EXPORT_SYMBOL(class_num2obd);
519 * Get obd devices count. Device in any
521 * \retval obd device count
523 int get_devices_count(void)
525 int index, max_index = class_devno_max(), dev_count = 0;
527 read_lock(&obd_dev_lock);
528 for (index = 0; index <= max_index; index++) {
529 struct obd_device *obd = class_num2obd(index);
533 read_unlock(&obd_dev_lock);
537 EXPORT_SYMBOL(get_devices_count);
539 void class_obd_list(void)
544 read_lock(&obd_dev_lock);
545 for (i = 0; i < class_devno_max(); i++) {
546 struct obd_device *obd = class_num2obd(i);
550 if (obd->obd_stopping)
552 else if (obd->obd_set_up)
554 else if (obd->obd_attached)
558 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
559 i, status, obd->obd_type->typ_name,
560 obd->obd_name, obd->obd_uuid.uuid,
561 atomic_read(&obd->obd_refcount));
563 read_unlock(&obd_dev_lock);
567 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
568 specified, then only the client with that uuid is returned,
569 otherwise any client connected to the tgt is returned. */
570 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
571 const char * typ_name,
572 struct obd_uuid *grp_uuid)
576 read_lock(&obd_dev_lock);
577 for (i = 0; i < class_devno_max(); i++) {
578 struct obd_device *obd = class_num2obd(i);
582 if ((strncmp(obd->obd_type->typ_name, typ_name,
583 strlen(typ_name)) == 0)) {
584 if (obd_uuid_equals(tgt_uuid,
585 &obd->u.cli.cl_target_uuid) &&
586 ((grp_uuid)? obd_uuid_equals(grp_uuid,
587 &obd->obd_uuid) : 1)) {
588 read_unlock(&obd_dev_lock);
593 read_unlock(&obd_dev_lock);
597 EXPORT_SYMBOL(class_find_client_obd);
599 /* Iterate the obd_device list looking devices have grp_uuid. Start
600 searching at *next, and if a device is found, the next index to look
601 at is saved in *next. If next is NULL, then the first matching device
602 will always be returned. */
603 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
609 else if (*next >= 0 && *next < class_devno_max())
614 read_lock(&obd_dev_lock);
615 for (; i < class_devno_max(); i++) {
616 struct obd_device *obd = class_num2obd(i);
620 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
623 read_unlock(&obd_dev_lock);
627 read_unlock(&obd_dev_lock);
631 EXPORT_SYMBOL(class_devices_in_group);
634 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
635 * adjust sptlrpc settings accordingly.
637 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
639 struct obd_device *obd;
643 LASSERT(namelen > 0);
645 read_lock(&obd_dev_lock);
646 for (i = 0; i < class_devno_max(); i++) {
647 obd = class_num2obd(i);
649 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
652 /* only notify mdc, osc, mdt, ost */
653 type = obd->obd_type->typ_name;
654 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
655 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
656 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
657 strcmp(type, LUSTRE_OST_NAME) != 0)
660 if (strncmp(obd->obd_name, fsname, namelen))
663 class_incref(obd, __FUNCTION__, obd);
664 read_unlock(&obd_dev_lock);
665 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
666 sizeof(KEY_SPTLRPC_CONF),
667 KEY_SPTLRPC_CONF, 0, NULL, NULL);
669 class_decref(obd, __FUNCTION__, obd);
670 read_lock(&obd_dev_lock);
672 read_unlock(&obd_dev_lock);
675 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
677 void obd_cleanup_caches(void)
680 if (obd_device_cachep) {
681 kmem_cache_destroy(obd_device_cachep);
682 obd_device_cachep = NULL;
685 kmem_cache_destroy(obdo_cachep);
689 kmem_cache_destroy(import_cachep);
690 import_cachep = NULL;
693 kmem_cache_destroy(capa_cachep);
699 int obd_init_caches(void)
704 LASSERT(obd_device_cachep == NULL);
705 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
706 sizeof(struct obd_device),
708 if (!obd_device_cachep)
709 GOTO(out, rc = -ENOMEM);
711 LASSERT(obdo_cachep == NULL);
712 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
715 GOTO(out, rc = -ENOMEM);
717 LASSERT(import_cachep == NULL);
718 import_cachep = kmem_cache_create("ll_import_cache",
719 sizeof(struct obd_import),
722 GOTO(out, rc = -ENOMEM);
724 LASSERT(capa_cachep == NULL);
725 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
728 GOTO(out, rc = -ENOMEM);
732 obd_cleanup_caches();
736 /* map connection to client */
737 struct obd_export *class_conn2export(struct lustre_handle *conn)
739 struct obd_export *export;
743 CDEBUG(D_CACHE, "looking for null handle\n");
747 if (conn->cookie == -1) { /* this means assign a new connection */
748 CDEBUG(D_CACHE, "want a new connection\n");
752 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
753 export = class_handle2object(conn->cookie, NULL);
756 EXPORT_SYMBOL(class_conn2export);
758 struct obd_device *class_exp2obd(struct obd_export *exp)
764 EXPORT_SYMBOL(class_exp2obd);
766 struct obd_device *class_conn2obd(struct lustre_handle *conn)
768 struct obd_export *export;
769 export = class_conn2export(conn);
771 struct obd_device *obd = export->exp_obd;
772 class_export_put(export);
777 EXPORT_SYMBOL(class_conn2obd);
779 struct obd_import *class_exp2cliimp(struct obd_export *exp)
781 struct obd_device *obd = exp->exp_obd;
784 return obd->u.cli.cl_import;
786 EXPORT_SYMBOL(class_exp2cliimp);
788 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
790 struct obd_device *obd = class_conn2obd(conn);
793 return obd->u.cli.cl_import;
795 EXPORT_SYMBOL(class_conn2cliimp);
797 /* Export management functions */
798 static void class_export_destroy(struct obd_export *exp)
800 struct obd_device *obd = exp->exp_obd;
803 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
804 LASSERT(obd != NULL);
806 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
807 exp->exp_client_uuid.uuid, obd->obd_name);
809 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
810 if (exp->exp_connection)
811 ptlrpc_put_connection_superhack(exp->exp_connection);
813 LASSERT(list_empty(&exp->exp_outstanding_replies));
814 LASSERT(list_empty(&exp->exp_uncommitted_replies));
815 LASSERT(list_empty(&exp->exp_req_replay_queue));
816 LASSERT(list_empty(&exp->exp_hp_rpcs));
817 obd_destroy_export(exp);
818 class_decref(obd, "export", exp);
820 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
824 static void export_handle_addref(void *export)
826 class_export_get(export);
829 static struct portals_handle_ops export_handle_ops = {
830 .hop_addref = export_handle_addref,
834 struct obd_export *class_export_get(struct obd_export *exp)
836 atomic_inc(&exp->exp_refcount);
837 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
838 atomic_read(&exp->exp_refcount));
841 EXPORT_SYMBOL(class_export_get);
843 void class_export_put(struct obd_export *exp)
845 LASSERT(exp != NULL);
846 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
847 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
848 atomic_read(&exp->exp_refcount) - 1);
850 if (atomic_dec_and_test(&exp->exp_refcount)) {
851 LASSERT(!list_empty(&exp->exp_obd_chain));
852 CDEBUG(D_IOCTL, "final put %p/%s\n",
853 exp, exp->exp_client_uuid.uuid);
855 /* release nid stat refererence */
856 lprocfs_exp_cleanup(exp);
858 obd_zombie_export_add(exp);
861 EXPORT_SYMBOL(class_export_put);
863 /* Creates a new export, adds it to the hash table, and returns a
864 * pointer to it. The refcount is 2: one for the hash reference, and
865 * one for the pointer returned by this function. */
866 struct obd_export *class_new_export(struct obd_device *obd,
867 struct obd_uuid *cluuid)
869 struct obd_export *export;
870 cfs_hash_t *hash = NULL;
874 OBD_ALLOC_PTR(export);
876 return ERR_PTR(-ENOMEM);
878 export->exp_conn_cnt = 0;
879 export->exp_lock_hash = NULL;
880 export->exp_flock_hash = NULL;
881 atomic_set(&export->exp_refcount, 2);
882 atomic_set(&export->exp_rpc_count, 0);
883 atomic_set(&export->exp_cb_count, 0);
884 atomic_set(&export->exp_locks_count, 0);
885 #if LUSTRE_TRACKS_LOCK_EXP_REFS
886 INIT_LIST_HEAD(&export->exp_locks_list);
887 spin_lock_init(&export->exp_locks_list_guard);
889 atomic_set(&export->exp_replay_count, 0);
890 export->exp_obd = obd;
891 INIT_LIST_HEAD(&export->exp_outstanding_replies);
892 spin_lock_init(&export->exp_uncommitted_replies_lock);
893 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
894 INIT_LIST_HEAD(&export->exp_req_replay_queue);
895 INIT_LIST_HEAD(&export->exp_handle.h_link);
896 INIT_LIST_HEAD(&export->exp_hp_rpcs);
897 INIT_LIST_HEAD(&export->exp_reg_rpcs);
898 class_handle_hash(&export->exp_handle, &export_handle_ops);
899 export->exp_last_request_time = cfs_time_current_sec();
900 spin_lock_init(&export->exp_lock);
901 spin_lock_init(&export->exp_rpc_lock);
902 INIT_HLIST_NODE(&export->exp_uuid_hash);
903 INIT_HLIST_NODE(&export->exp_nid_hash);
904 spin_lock_init(&export->exp_bl_list_lock);
905 INIT_LIST_HEAD(&export->exp_bl_list);
907 export->exp_sp_peer = LUSTRE_SP_ANY;
908 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
909 export->exp_client_uuid = *cluuid;
910 obd_init_export(export);
912 spin_lock(&obd->obd_dev_lock);
913 /* shouldn't happen, but might race */
914 if (obd->obd_stopping)
915 GOTO(exit_unlock, rc = -ENODEV);
917 hash = cfs_hash_getref(obd->obd_uuid_hash);
919 GOTO(exit_unlock, rc = -ENODEV);
920 spin_unlock(&obd->obd_dev_lock);
922 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
923 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
925 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
926 obd->obd_name, cluuid->uuid, rc);
927 GOTO(exit_err, rc = -EALREADY);
931 spin_lock(&obd->obd_dev_lock);
932 if (obd->obd_stopping) {
933 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
934 GOTO(exit_unlock, rc = -ENODEV);
937 class_incref(obd, "export", export);
938 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
939 list_add_tail(&export->exp_obd_chain_timed,
940 &export->exp_obd->obd_exports_timed);
941 export->exp_obd->obd_num_exports++;
942 spin_unlock(&obd->obd_dev_lock);
943 cfs_hash_putref(hash);
947 spin_unlock(&obd->obd_dev_lock);
950 cfs_hash_putref(hash);
951 class_handle_unhash(&export->exp_handle);
952 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
953 obd_destroy_export(export);
954 OBD_FREE_PTR(export);
957 EXPORT_SYMBOL(class_new_export);
959 void class_unlink_export(struct obd_export *exp)
961 class_handle_unhash(&exp->exp_handle);
963 spin_lock(&exp->exp_obd->obd_dev_lock);
964 /* delete an uuid-export hashitem from hashtables */
965 if (!hlist_unhashed(&exp->exp_uuid_hash))
966 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
967 &exp->exp_client_uuid,
968 &exp->exp_uuid_hash);
970 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
971 list_del_init(&exp->exp_obd_chain_timed);
972 exp->exp_obd->obd_num_exports--;
973 spin_unlock(&exp->exp_obd->obd_dev_lock);
974 class_export_put(exp);
976 EXPORT_SYMBOL(class_unlink_export);
978 /* Import management functions */
979 void class_import_destroy(struct obd_import *imp)
983 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
984 imp->imp_obd->obd_name);
986 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
988 ptlrpc_put_connection_superhack(imp->imp_connection);
990 while (!list_empty(&imp->imp_conn_list)) {
991 struct obd_import_conn *imp_conn;
993 imp_conn = list_entry(imp->imp_conn_list.next,
994 struct obd_import_conn, oic_item);
995 list_del_init(&imp_conn->oic_item);
996 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
997 OBD_FREE(imp_conn, sizeof(*imp_conn));
1000 LASSERT(imp->imp_sec == NULL);
1001 class_decref(imp->imp_obd, "import", imp);
1002 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1006 static void import_handle_addref(void *import)
1008 class_import_get(import);
1011 static struct portals_handle_ops import_handle_ops = {
1012 .hop_addref = import_handle_addref,
1016 struct obd_import *class_import_get(struct obd_import *import)
1018 atomic_inc(&import->imp_refcount);
1019 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1020 atomic_read(&import->imp_refcount),
1021 import->imp_obd->obd_name);
1024 EXPORT_SYMBOL(class_import_get);
1026 void class_import_put(struct obd_import *imp)
1030 LASSERT(list_empty(&imp->imp_zombie_chain));
1031 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1033 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1034 atomic_read(&imp->imp_refcount) - 1,
1035 imp->imp_obd->obd_name);
1037 if (atomic_dec_and_test(&imp->imp_refcount)) {
1038 CDEBUG(D_INFO, "final put import %p\n", imp);
1039 obd_zombie_import_add(imp);
1042 /* catch possible import put race */
1043 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1046 EXPORT_SYMBOL(class_import_put);
1048 static void init_imp_at(struct imp_at *at) {
1050 at_init(&at->iat_net_latency, 0, 0);
1051 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1052 /* max service estimates are tracked on the server side, so
1053 don't use the AT history here, just use the last reported
1054 val. (But keep hist for proc histogram, worst_ever) */
1055 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1060 struct obd_import *class_new_import(struct obd_device *obd)
1062 struct obd_import *imp;
1064 OBD_ALLOC(imp, sizeof(*imp));
1068 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1069 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1070 INIT_LIST_HEAD(&imp->imp_replay_list);
1071 INIT_LIST_HEAD(&imp->imp_sending_list);
1072 INIT_LIST_HEAD(&imp->imp_delayed_list);
1073 INIT_LIST_HEAD(&imp->imp_committed_list);
1074 imp->imp_replay_cursor = &imp->imp_committed_list;
1075 spin_lock_init(&imp->imp_lock);
1076 imp->imp_last_success_conn = 0;
1077 imp->imp_state = LUSTRE_IMP_NEW;
1078 imp->imp_obd = class_incref(obd, "import", imp);
1079 mutex_init(&imp->imp_sec_mutex);
1080 init_waitqueue_head(&imp->imp_recovery_waitq);
1082 atomic_set(&imp->imp_refcount, 2);
1083 atomic_set(&imp->imp_unregistering, 0);
1084 atomic_set(&imp->imp_inflight, 0);
1085 atomic_set(&imp->imp_replay_inflight, 0);
1086 atomic_set(&imp->imp_inval_count, 0);
1087 INIT_LIST_HEAD(&imp->imp_conn_list);
1088 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1089 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1090 init_imp_at(&imp->imp_at);
1092 /* the default magic is V2, will be used in connect RPC, and
1093 * then adjusted according to the flags in request/reply. */
1094 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1098 EXPORT_SYMBOL(class_new_import);
1100 void class_destroy_import(struct obd_import *import)
1102 LASSERT(import != NULL);
1103 LASSERT(import != LP_POISON);
1105 class_handle_unhash(&import->imp_handle);
1107 spin_lock(&import->imp_lock);
1108 import->imp_generation++;
1109 spin_unlock(&import->imp_lock);
1110 class_import_put(import);
1112 EXPORT_SYMBOL(class_destroy_import);
1114 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1116 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1118 spin_lock(&exp->exp_locks_list_guard);
1120 LASSERT(lock->l_exp_refs_nr >= 0);
1122 if (lock->l_exp_refs_target != NULL &&
1123 lock->l_exp_refs_target != exp) {
1124 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1125 exp, lock, lock->l_exp_refs_target);
1127 if ((lock->l_exp_refs_nr ++) == 0) {
1128 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1129 lock->l_exp_refs_target = exp;
1131 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1132 lock, exp, lock->l_exp_refs_nr);
1133 spin_unlock(&exp->exp_locks_list_guard);
1135 EXPORT_SYMBOL(__class_export_add_lock_ref);
1137 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1139 spin_lock(&exp->exp_locks_list_guard);
1140 LASSERT(lock->l_exp_refs_nr > 0);
1141 if (lock->l_exp_refs_target != exp) {
1142 LCONSOLE_WARN("lock %p, "
1143 "mismatching export pointers: %p, %p\n",
1144 lock, lock->l_exp_refs_target, exp);
1146 if (-- lock->l_exp_refs_nr == 0) {
1147 list_del_init(&lock->l_exp_refs_link);
1148 lock->l_exp_refs_target = NULL;
1150 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1151 lock, exp, lock->l_exp_refs_nr);
1152 spin_unlock(&exp->exp_locks_list_guard);
1154 EXPORT_SYMBOL(__class_export_del_lock_ref);
1157 /* A connection defines an export context in which preallocation can
1158 be managed. This releases the export pointer reference, and returns
1159 the export handle, so the export refcount is 1 when this function
1161 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1162 struct obd_uuid *cluuid)
1164 struct obd_export *export;
1165 LASSERT(conn != NULL);
1166 LASSERT(obd != NULL);
1167 LASSERT(cluuid != NULL);
1170 export = class_new_export(obd, cluuid);
1172 RETURN(PTR_ERR(export));
1174 conn->cookie = export->exp_handle.h_cookie;
1175 class_export_put(export);
1177 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1178 cluuid->uuid, conn->cookie);
1181 EXPORT_SYMBOL(class_connect);
1183 /* if export is involved in recovery then clean up related things */
1184 void class_export_recovery_cleanup(struct obd_export *exp)
1186 struct obd_device *obd = exp->exp_obd;
1188 spin_lock(&obd->obd_recovery_task_lock);
1189 if (obd->obd_recovering) {
1190 if (exp->exp_in_recovery) {
1191 spin_lock(&exp->exp_lock);
1192 exp->exp_in_recovery = 0;
1193 spin_unlock(&exp->exp_lock);
1194 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1195 atomic_dec(&obd->obd_connected_clients);
1198 /* if called during recovery then should update
1199 * obd_stale_clients counter,
1200 * lightweight exports are not counted */
1201 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1202 exp->exp_obd->obd_stale_clients++;
1204 spin_unlock(&obd->obd_recovery_task_lock);
1206 spin_lock(&exp->exp_lock);
1207 /** Cleanup req replay fields */
1208 if (exp->exp_req_replay_needed) {
1209 exp->exp_req_replay_needed = 0;
1211 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1212 atomic_dec(&obd->obd_req_replay_clients);
1215 /** Cleanup lock replay data */
1216 if (exp->exp_lock_replay_needed) {
1217 exp->exp_lock_replay_needed = 0;
1219 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1220 atomic_dec(&obd->obd_lock_replay_clients);
1222 spin_unlock(&exp->exp_lock);
1225 /* This function removes 1-3 references from the export:
1226 * 1 - for export pointer passed
1227 * and if disconnect really need
1228 * 2 - removing from hash
1229 * 3 - in client_unlink_export
1230 * The export pointer passed to this function can destroyed */
1231 int class_disconnect(struct obd_export *export)
1233 int already_disconnected;
1236 if (export == NULL) {
1237 CWARN("attempting to free NULL export %p\n", export);
1241 spin_lock(&export->exp_lock);
1242 already_disconnected = export->exp_disconnected;
1243 export->exp_disconnected = 1;
1244 spin_unlock(&export->exp_lock);
1246 /* class_cleanup(), abort_recovery(), and class_fail_export()
1247 * all end up in here, and if any of them race we shouldn't
1248 * call extra class_export_puts(). */
1249 if (already_disconnected) {
1250 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1251 GOTO(no_disconn, already_disconnected);
1254 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1255 export->exp_handle.h_cookie);
1257 if (!hlist_unhashed(&export->exp_nid_hash))
1258 cfs_hash_del(export->exp_obd->obd_nid_hash,
1259 &export->exp_connection->c_peer.nid,
1260 &export->exp_nid_hash);
1262 class_export_recovery_cleanup(export);
1263 class_unlink_export(export);
1265 class_export_put(export);
1268 EXPORT_SYMBOL(class_disconnect);
1270 /* Return non-zero for a fully connected export */
1271 int class_connected_export(struct obd_export *exp)
1276 spin_lock(&exp->exp_lock);
1277 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1278 spin_unlock(&exp->exp_lock);
1282 EXPORT_SYMBOL(class_connected_export);
1284 static void class_disconnect_export_list(struct list_head *list,
1285 enum obd_option flags)
1288 struct obd_export *exp;
1291 /* It's possible that an export may disconnect itself, but
1292 * nothing else will be added to this list. */
1293 while (!list_empty(list)) {
1294 exp = list_entry(list->next, struct obd_export,
1296 /* need for safe call CDEBUG after obd_disconnect */
1297 class_export_get(exp);
1299 spin_lock(&exp->exp_lock);
1300 exp->exp_flags = flags;
1301 spin_unlock(&exp->exp_lock);
1303 if (obd_uuid_equals(&exp->exp_client_uuid,
1304 &exp->exp_obd->obd_uuid)) {
1306 "exp %p export uuid == obd uuid, don't discon\n",
1308 /* Need to delete this now so we don't end up pointing
1309 * to work_list later when this export is cleaned up. */
1310 list_del_init(&exp->exp_obd_chain);
1311 class_export_put(exp);
1315 class_export_get(exp);
1316 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1317 "last request at "CFS_TIME_T"\n",
1318 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1319 exp, exp->exp_last_request_time);
1320 /* release one export reference anyway */
1321 rc = obd_disconnect(exp);
1323 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1324 obd_export_nid2str(exp), exp, rc);
1325 class_export_put(exp);
1330 void class_disconnect_exports(struct obd_device *obd)
1332 struct list_head work_list;
1335 /* Move all of the exports from obd_exports to a work list, en masse. */
1336 INIT_LIST_HEAD(&work_list);
1337 spin_lock(&obd->obd_dev_lock);
1338 list_splice_init(&obd->obd_exports, &work_list);
1339 list_splice_init(&obd->obd_delayed_exports, &work_list);
1340 spin_unlock(&obd->obd_dev_lock);
1342 if (!list_empty(&work_list)) {
1343 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1344 "disconnecting them\n", obd->obd_minor, obd);
1345 class_disconnect_export_list(&work_list,
1346 exp_flags_from_obd(obd));
1348 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1349 obd->obd_minor, obd);
1352 EXPORT_SYMBOL(class_disconnect_exports);
1354 /* Remove exports that have not completed recovery.
1356 void class_disconnect_stale_exports(struct obd_device *obd,
1357 int (*test_export)(struct obd_export *))
1359 struct list_head work_list;
1360 struct obd_export *exp, *n;
1364 INIT_LIST_HEAD(&work_list);
1365 spin_lock(&obd->obd_dev_lock);
1366 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1368 /* don't count self-export as client */
1369 if (obd_uuid_equals(&exp->exp_client_uuid,
1370 &exp->exp_obd->obd_uuid))
1373 /* don't evict clients which have no slot in last_rcvd
1374 * (e.g. lightweight connection) */
1375 if (exp->exp_target_data.ted_lr_idx == -1)
1378 spin_lock(&exp->exp_lock);
1379 if (exp->exp_failed || test_export(exp)) {
1380 spin_unlock(&exp->exp_lock);
1383 exp->exp_failed = 1;
1384 spin_unlock(&exp->exp_lock);
1386 list_move(&exp->exp_obd_chain, &work_list);
1388 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1389 obd->obd_name, exp->exp_client_uuid.uuid,
1390 exp->exp_connection == NULL ? "<unknown>" :
1391 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1392 print_export_data(exp, "EVICTING", 0);
1394 spin_unlock(&obd->obd_dev_lock);
1397 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1398 obd->obd_name, evicted);
1400 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1401 OBD_OPT_ABORT_RECOV);
1404 EXPORT_SYMBOL(class_disconnect_stale_exports);
1406 void class_fail_export(struct obd_export *exp)
1408 int rc, already_failed;
1410 spin_lock(&exp->exp_lock);
1411 already_failed = exp->exp_failed;
1412 exp->exp_failed = 1;
1413 spin_unlock(&exp->exp_lock);
1415 if (already_failed) {
1416 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1417 exp, exp->exp_client_uuid.uuid);
1421 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1422 exp, exp->exp_client_uuid.uuid);
1424 if (obd_dump_on_timeout)
1425 libcfs_debug_dumplog();
1427 /* need for safe call CDEBUG after obd_disconnect */
1428 class_export_get(exp);
1430 /* Most callers into obd_disconnect are removing their own reference
1431 * (request, for example) in addition to the one from the hash table.
1432 * We don't have such a reference here, so make one. */
1433 class_export_get(exp);
1434 rc = obd_disconnect(exp);
1436 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1438 CDEBUG(D_HA, "disconnected export %p/%s\n",
1439 exp, exp->exp_client_uuid.uuid);
1440 class_export_put(exp);
1442 EXPORT_SYMBOL(class_fail_export);
1444 char *obd_export_nid2str(struct obd_export *exp)
1446 if (exp->exp_connection != NULL)
1447 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1451 EXPORT_SYMBOL(obd_export_nid2str);
1453 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1455 cfs_hash_t *nid_hash;
1456 struct obd_export *doomed_exp = NULL;
1457 int exports_evicted = 0;
1459 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1461 spin_lock(&obd->obd_dev_lock);
1462 /* umount has run already, so evict thread should leave
1463 * its task to umount thread now */
1464 if (obd->obd_stopping) {
1465 spin_unlock(&obd->obd_dev_lock);
1466 return exports_evicted;
1468 nid_hash = obd->obd_nid_hash;
1469 cfs_hash_getref(nid_hash);
1470 spin_unlock(&obd->obd_dev_lock);
1473 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1474 if (doomed_exp == NULL)
1477 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1478 "nid %s found, wanted nid %s, requested nid %s\n",
1479 obd_export_nid2str(doomed_exp),
1480 libcfs_nid2str(nid_key), nid);
1481 LASSERTF(doomed_exp != obd->obd_self_export,
1482 "self-export is hashed by NID?\n");
1484 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1485 "request\n", obd->obd_name,
1486 obd_uuid2str(&doomed_exp->exp_client_uuid),
1487 obd_export_nid2str(doomed_exp));
1488 class_fail_export(doomed_exp);
1489 class_export_put(doomed_exp);
1492 cfs_hash_putref(nid_hash);
1494 if (!exports_evicted)
1495 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1496 obd->obd_name, nid);
1497 return exports_evicted;
1499 EXPORT_SYMBOL(obd_export_evict_by_nid);
1501 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1503 cfs_hash_t *uuid_hash;
1504 struct obd_export *doomed_exp = NULL;
1505 struct obd_uuid doomed_uuid;
1506 int exports_evicted = 0;
1508 spin_lock(&obd->obd_dev_lock);
1509 if (obd->obd_stopping) {
1510 spin_unlock(&obd->obd_dev_lock);
1511 return exports_evicted;
1513 uuid_hash = obd->obd_uuid_hash;
1514 cfs_hash_getref(uuid_hash);
1515 spin_unlock(&obd->obd_dev_lock);
1517 obd_str2uuid(&doomed_uuid, uuid);
1518 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1519 CERROR("%s: can't evict myself\n", obd->obd_name);
1520 cfs_hash_putref(uuid_hash);
1521 return exports_evicted;
1524 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1526 if (doomed_exp == NULL) {
1527 CERROR("%s: can't disconnect %s: no exports found\n",
1528 obd->obd_name, uuid);
1530 CWARN("%s: evicting %s at adminstrative request\n",
1531 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1532 class_fail_export(doomed_exp);
1533 class_export_put(doomed_exp);
1536 cfs_hash_putref(uuid_hash);
1538 return exports_evicted;
1540 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1542 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1543 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1544 EXPORT_SYMBOL(class_export_dump_hook);
1547 static void print_export_data(struct obd_export *exp, const char *status,
1550 struct ptlrpc_reply_state *rs;
1551 struct ptlrpc_reply_state *first_reply = NULL;
1554 spin_lock(&exp->exp_lock);
1555 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1561 spin_unlock(&exp->exp_lock);
1563 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1564 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1565 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1566 atomic_read(&exp->exp_rpc_count),
1567 atomic_read(&exp->exp_cb_count),
1568 atomic_read(&exp->exp_locks_count),
1569 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1570 nreplies, first_reply, nreplies > 3 ? "..." : "",
1571 exp->exp_last_committed);
1572 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1573 if (locks && class_export_dump_hook != NULL)
1574 class_export_dump_hook(exp);
1578 void dump_exports(struct obd_device *obd, int locks)
1580 struct obd_export *exp;
1582 spin_lock(&obd->obd_dev_lock);
1583 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1584 print_export_data(exp, "ACTIVE", locks);
1585 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1586 print_export_data(exp, "UNLINKED", locks);
1587 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1588 print_export_data(exp, "DELAYED", locks);
1589 spin_unlock(&obd->obd_dev_lock);
1590 spin_lock(&obd_zombie_impexp_lock);
1591 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1592 print_export_data(exp, "ZOMBIE", locks);
1593 spin_unlock(&obd_zombie_impexp_lock);
1595 EXPORT_SYMBOL(dump_exports);
1597 void obd_exports_barrier(struct obd_device *obd)
1600 LASSERT(list_empty(&obd->obd_exports));
1601 spin_lock(&obd->obd_dev_lock);
1602 while (!list_empty(&obd->obd_unlinked_exports)) {
1603 spin_unlock(&obd->obd_dev_lock);
1604 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1605 cfs_time_seconds(waited));
1606 if (waited > 5 && IS_PO2(waited)) {
1607 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1608 "more than %d seconds. "
1609 "The obd refcount = %d. Is it stuck?\n",
1610 obd->obd_name, waited,
1611 atomic_read(&obd->obd_refcount));
1612 dump_exports(obd, 1);
1615 spin_lock(&obd->obd_dev_lock);
1617 spin_unlock(&obd->obd_dev_lock);
1619 EXPORT_SYMBOL(obd_exports_barrier);
1621 /* Total amount of zombies to be destroyed */
1622 static int zombies_count = 0;
1625 * kill zombie imports and exports
1627 void obd_zombie_impexp_cull(void)
1629 struct obd_import *import;
1630 struct obd_export *export;
1634 spin_lock(&obd_zombie_impexp_lock);
1637 if (!list_empty(&obd_zombie_imports)) {
1638 import = list_entry(obd_zombie_imports.next,
1641 list_del_init(&import->imp_zombie_chain);
1645 if (!list_empty(&obd_zombie_exports)) {
1646 export = list_entry(obd_zombie_exports.next,
1649 list_del_init(&export->exp_obd_chain);
1652 spin_unlock(&obd_zombie_impexp_lock);
1654 if (import != NULL) {
1655 class_import_destroy(import);
1656 spin_lock(&obd_zombie_impexp_lock);
1658 spin_unlock(&obd_zombie_impexp_lock);
1661 if (export != NULL) {
1662 class_export_destroy(export);
1663 spin_lock(&obd_zombie_impexp_lock);
1665 spin_unlock(&obd_zombie_impexp_lock);
1669 } while (import != NULL || export != NULL);
1673 static struct completion obd_zombie_start;
1674 static struct completion obd_zombie_stop;
1675 static unsigned long obd_zombie_flags;
1676 static wait_queue_head_t obd_zombie_waitq;
1677 static pid_t obd_zombie_pid;
1680 OBD_ZOMBIE_STOP = 0x0001,
1684 * check for work for kill zombie import/export thread.
1686 static int obd_zombie_impexp_check(void *arg)
1690 spin_lock(&obd_zombie_impexp_lock);
1691 rc = (zombies_count == 0) &&
1692 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1693 spin_unlock(&obd_zombie_impexp_lock);
1699 * Add export to the obd_zombe thread and notify it.
1701 static void obd_zombie_export_add(struct obd_export *exp) {
1702 spin_lock(&exp->exp_obd->obd_dev_lock);
1703 LASSERT(!list_empty(&exp->exp_obd_chain));
1704 list_del_init(&exp->exp_obd_chain);
1705 spin_unlock(&exp->exp_obd->obd_dev_lock);
1706 spin_lock(&obd_zombie_impexp_lock);
1708 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1709 spin_unlock(&obd_zombie_impexp_lock);
1711 obd_zombie_impexp_notify();
1715 * Add import to the obd_zombe thread and notify it.
1717 static void obd_zombie_import_add(struct obd_import *imp) {
1718 LASSERT(imp->imp_sec == NULL);
1719 LASSERT(imp->imp_rq_pool == NULL);
1720 spin_lock(&obd_zombie_impexp_lock);
1721 LASSERT(list_empty(&imp->imp_zombie_chain));
1723 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1724 spin_unlock(&obd_zombie_impexp_lock);
1726 obd_zombie_impexp_notify();
1730 * notify import/export destroy thread about new zombie.
1732 static void obd_zombie_impexp_notify(void)
1735 * Make sure obd_zomebie_impexp_thread get this notification.
1736 * It is possible this signal only get by obd_zombie_barrier, and
1737 * barrier gulps this notification and sleeps away and hangs ensues
1739 wake_up_all(&obd_zombie_waitq);
1743 * check whether obd_zombie is idle
1745 static int obd_zombie_is_idle(void)
1749 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1750 spin_lock(&obd_zombie_impexp_lock);
1751 rc = (zombies_count == 0);
1752 spin_unlock(&obd_zombie_impexp_lock);
1757 * wait when obd_zombie import/export queues become empty
1759 void obd_zombie_barrier(void)
1761 struct l_wait_info lwi = { 0 };
1763 if (obd_zombie_pid == current_pid())
1764 /* don't wait for myself */
1766 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1768 EXPORT_SYMBOL(obd_zombie_barrier);
1773 * destroy zombie export/import thread.
1775 static int obd_zombie_impexp_thread(void *unused)
1777 unshare_fs_struct();
1778 complete(&obd_zombie_start);
1780 obd_zombie_pid = current_pid();
1782 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1783 struct l_wait_info lwi = { 0 };
1785 l_wait_event(obd_zombie_waitq,
1786 !obd_zombie_impexp_check(NULL), &lwi);
1787 obd_zombie_impexp_cull();
1790 * Notify obd_zombie_barrier callers that queues
1793 wake_up(&obd_zombie_waitq);
1796 complete(&obd_zombie_stop);
1801 #else /* ! KERNEL */
1803 static atomic_t zombie_recur = ATOMIC_INIT(0);
1804 static void *obd_zombie_impexp_work_cb;
1805 static void *obd_zombie_impexp_idle_cb;
1807 int obd_zombie_impexp_kill(void *arg)
1811 if (atomic_inc_return(&zombie_recur) == 1) {
1812 obd_zombie_impexp_cull();
1815 atomic_dec(&zombie_recur);
1822 * start destroy zombie import/export thread
1824 int obd_zombie_impexp_init(void)
1827 struct task_struct *task;
1830 INIT_LIST_HEAD(&obd_zombie_imports);
1832 INIT_LIST_HEAD(&obd_zombie_exports);
1833 spin_lock_init(&obd_zombie_impexp_lock);
1834 init_completion(&obd_zombie_start);
1835 init_completion(&obd_zombie_stop);
1836 init_waitqueue_head(&obd_zombie_waitq);
1840 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1842 RETURN(PTR_ERR(task));
1844 wait_for_completion(&obd_zombie_start);
1847 obd_zombie_impexp_work_cb =
1848 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1849 &obd_zombie_impexp_kill, NULL);
1851 obd_zombie_impexp_idle_cb =
1852 liblustre_register_idle_callback("obd_zombi_impexp_check",
1853 &obd_zombie_impexp_check, NULL);
1858 * stop destroy zombie import/export thread
1860 void obd_zombie_impexp_stop(void)
1862 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1863 obd_zombie_impexp_notify();
1865 wait_for_completion(&obd_zombie_stop);
1867 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1868 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1872 /***** Kernel-userspace comm helpers *******/
1874 /* Get length of entire message, including header */
1875 int kuc_len(int payload_len)
1877 return sizeof(struct kuc_hdr) + payload_len;
1879 EXPORT_SYMBOL(kuc_len);
1881 /* Get a pointer to kuc header, given a ptr to the payload
1882 * @param p Pointer to payload area
1883 * @returns Pointer to kuc header
1885 struct kuc_hdr * kuc_ptr(void *p)
1887 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1888 LASSERT(lh->kuc_magic == KUC_MAGIC);
1891 EXPORT_SYMBOL(kuc_ptr);
1893 /* Test if payload is part of kuc message
1894 * @param p Pointer to payload area
1897 int kuc_ispayload(void *p)
1899 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1901 if (kh->kuc_magic == KUC_MAGIC)
1906 EXPORT_SYMBOL(kuc_ispayload);
1908 /* Alloc space for a message, and fill in header
1909 * @return Pointer to payload area
1911 void *kuc_alloc(int payload_len, int transport, int type)
1914 int len = kuc_len(payload_len);
1918 return ERR_PTR(-ENOMEM);
1920 lh->kuc_magic = KUC_MAGIC;
1921 lh->kuc_transport = transport;
1922 lh->kuc_msgtype = type;
1923 lh->kuc_msglen = len;
1925 return (void *)(lh + 1);
1927 EXPORT_SYMBOL(kuc_alloc);
1929 /* Takes pointer to payload area */
1930 inline void kuc_free(void *p, int payload_len)
1932 struct kuc_hdr *lh = kuc_ptr(p);
1933 OBD_FREE(lh, kuc_len(payload_len));
1935 EXPORT_SYMBOL(kuc_free);
1937 struct obd_request_slot_waiter {
1938 struct list_head orsw_entry;
1939 wait_queue_head_t orsw_waitq;
1943 static bool obd_request_slot_avail(struct client_obd *cli,
1944 struct obd_request_slot_waiter *orsw)
1948 client_obd_list_lock(&cli->cl_loi_list_lock);
1949 avail = !!list_empty(&orsw->orsw_entry);
1950 client_obd_list_unlock(&cli->cl_loi_list_lock);
1956 * For network flow control, the RPC sponsor needs to acquire a credit
1957 * before sending the RPC. The credits count for a connection is defined
1958 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1959 * the subsequent RPC sponsors need to wait until others released their
1960 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1962 int obd_get_request_slot(struct client_obd *cli)
1964 struct obd_request_slot_waiter orsw;
1965 struct l_wait_info lwi;
1968 client_obd_list_lock(&cli->cl_loi_list_lock);
1969 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1970 cli->cl_r_in_flight++;
1971 client_obd_list_unlock(&cli->cl_loi_list_lock);
1975 init_waitqueue_head(&orsw.orsw_waitq);
1976 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1977 orsw.orsw_signaled = false;
1978 client_obd_list_unlock(&cli->cl_loi_list_lock);
1980 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1981 rc = l_wait_event(orsw.orsw_waitq,
1982 obd_request_slot_avail(cli, &orsw) ||
1986 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1987 * freed but other (such as obd_put_request_slot) is using it. */
1988 client_obd_list_lock(&cli->cl_loi_list_lock);
1990 if (!orsw.orsw_signaled) {
1991 if (list_empty(&orsw.orsw_entry))
1992 cli->cl_r_in_flight--;
1994 list_del(&orsw.orsw_entry);
1998 if (orsw.orsw_signaled) {
1999 LASSERT(list_empty(&orsw.orsw_entry));
2003 client_obd_list_unlock(&cli->cl_loi_list_lock);
2007 EXPORT_SYMBOL(obd_get_request_slot);
2009 void obd_put_request_slot(struct client_obd *cli)
2011 struct obd_request_slot_waiter *orsw;
2013 client_obd_list_lock(&cli->cl_loi_list_lock);
2014 cli->cl_r_in_flight--;
2016 /* If there is free slot, wakeup the first waiter. */
2017 if (!list_empty(&cli->cl_loi_read_list) &&
2018 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2019 orsw = list_entry(cli->cl_loi_read_list.next,
2020 struct obd_request_slot_waiter, orsw_entry);
2021 list_del_init(&orsw->orsw_entry);
2022 cli->cl_r_in_flight++;
2023 wake_up(&orsw->orsw_waitq);
2025 client_obd_list_unlock(&cli->cl_loi_list_lock);
2027 EXPORT_SYMBOL(obd_put_request_slot);
2029 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2031 return cli->cl_max_rpcs_in_flight;
2033 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2035 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2037 struct obd_request_slot_waiter *orsw;
2042 if (max > OBD_MAX_RIF_MAX || max < 1)
2045 client_obd_list_lock(&cli->cl_loi_list_lock);
2046 old = cli->cl_max_rpcs_in_flight;
2047 cli->cl_max_rpcs_in_flight = max;
2050 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2051 for (i = 0; i < diff; i++) {
2052 if (list_empty(&cli->cl_loi_read_list))
2055 orsw = list_entry(cli->cl_loi_read_list.next,
2056 struct obd_request_slot_waiter, orsw_entry);
2057 list_del_init(&orsw->orsw_entry);
2058 cli->cl_r_in_flight++;
2059 wake_up(&orsw->orsw_waitq);
2061 client_obd_list_unlock(&cli->cl_loi_list_lock);
2065 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);