4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
46 spinlock_t obd_types_lock;
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
53 static struct list_head obd_zombie_imports;
54 static struct list_head obd_zombie_exports;
55 static spinlock_t obd_zombie_impexp_lock;
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks);
63 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
64 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
81 static void obd_device_free(struct obd_device *obd)
84 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
85 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
86 if (obd->obd_namespace != NULL) {
87 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
88 obd, obd->obd_namespace, obd->obd_force);
91 lu_ref_fini(&obd->obd_reference);
92 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 struct obd_type *class_search_type(const char *name)
97 struct list_head *tmp;
98 struct obd_type *type;
100 spin_lock(&obd_types_lock);
101 list_for_each(tmp, &obd_types) {
102 type = list_entry(tmp, struct obd_type, typ_chain);
103 if (strcmp(type->typ_name, name) == 0) {
104 spin_unlock(&obd_types_lock);
108 spin_unlock(&obd_types_lock);
111 EXPORT_SYMBOL(class_search_type);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
117 #ifdef HAVE_MODULE_LOADING_SUPPORT
119 const char *modname = name;
121 if (strcmp(modname, "obdfilter") == 0)
124 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
125 modname = LUSTRE_OSP_NAME;
127 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
128 modname = LUSTRE_MDT_NAME;
130 if (!request_module("%s", modname)) {
131 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
132 type = class_search_type(name);
134 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
140 spin_lock(&type->obd_type_lock);
142 try_module_get(type->typ_dt_ops->o_owner);
143 spin_unlock(&type->obd_type_lock);
148 void class_put_type(struct obd_type *type)
151 spin_lock(&type->obd_type_lock);
153 module_put(type->typ_dt_ops->o_owner);
154 spin_unlock(&type->obd_type_lock);
157 #define CLASS_MAX_NAME 1024
159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160 bool enable_proc, struct lprocfs_vars *vars,
161 const char *name, struct lu_device_type *ldt)
163 struct obd_type *type;
168 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
170 if (class_search_type(name)) {
171 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
176 OBD_ALLOC(type, sizeof(*type));
180 OBD_ALLOC_PTR(type->typ_dt_ops);
181 OBD_ALLOC_PTR(type->typ_md_ops);
182 OBD_ALLOC(type->typ_name, strlen(name) + 1);
184 if (type->typ_dt_ops == NULL ||
185 type->typ_md_ops == NULL ||
186 type->typ_name == NULL)
189 *(type->typ_dt_ops) = *dt_ops;
190 /* md_ops is optional */
192 *(type->typ_md_ops) = *md_ops;
193 strcpy(type->typ_name, name);
194 spin_lock_init(&type->obd_type_lock);
196 #ifdef CONFIG_PROC_FS
198 type->typ_procroot = lprocfs_register(type->typ_name,
201 if (IS_ERR(type->typ_procroot)) {
202 rc = PTR_ERR(type->typ_procroot);
203 type->typ_procroot = NULL;
210 rc = lu_device_type_init(ldt);
215 spin_lock(&obd_types_lock);
216 list_add(&type->typ_chain, &obd_types);
217 spin_unlock(&obd_types_lock);
222 if (type->typ_name != NULL) {
223 #ifdef CONFIG_PROC_FS
224 if (type->typ_procroot != NULL)
225 remove_proc_subtree(type->typ_name, proc_lustre_root);
227 OBD_FREE(type->typ_name, strlen(name) + 1);
229 if (type->typ_md_ops != NULL)
230 OBD_FREE_PTR(type->typ_md_ops);
231 if (type->typ_dt_ops != NULL)
232 OBD_FREE_PTR(type->typ_dt_ops);
233 OBD_FREE(type, sizeof(*type));
236 EXPORT_SYMBOL(class_register_type);
238 int class_unregister_type(const char *name)
240 struct obd_type *type = class_search_type(name);
244 CERROR("unknown obd type\n");
248 if (type->typ_refcnt) {
249 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
250 /* This is a bad situation, let's make the best of it */
251 /* Remove ops, but leave the name for debugging */
252 OBD_FREE_PTR(type->typ_dt_ops);
253 OBD_FREE_PTR(type->typ_md_ops);
257 /* we do not use type->typ_procroot as for compatibility purposes
258 * other modules can share names (i.e. lod can use lov entry). so
259 * we can't reference pointer as it can get invalided when another
260 * module removes the entry */
261 #ifdef CONFIG_PROC_FS
262 if (type->typ_procroot != NULL)
263 remove_proc_subtree(type->typ_name, proc_lustre_root);
264 if (type->typ_procsym != NULL)
265 lprocfs_remove(&type->typ_procsym);
268 lu_device_type_fini(type->typ_lu);
270 spin_lock(&obd_types_lock);
271 list_del(&type->typ_chain);
272 spin_unlock(&obd_types_lock);
273 OBD_FREE(type->typ_name, strlen(name) + 1);
274 if (type->typ_dt_ops != NULL)
275 OBD_FREE_PTR(type->typ_dt_ops);
276 if (type->typ_md_ops != NULL)
277 OBD_FREE_PTR(type->typ_md_ops);
278 OBD_FREE(type, sizeof(*type));
280 } /* class_unregister_type */
281 EXPORT_SYMBOL(class_unregister_type);
284 * Create a new obd device.
286 * Find an empty slot in ::obd_devs[], create a new obd device in it.
288 * \param[in] type_name obd device type string.
289 * \param[in] name obd device name.
291 * \retval NULL if create fails, otherwise return the obd device
294 struct obd_device *class_newdev(const char *type_name, const char *name)
296 struct obd_device *result = NULL;
297 struct obd_device *newdev;
298 struct obd_type *type = NULL;
300 int new_obd_minor = 0;
303 if (strlen(name) >= MAX_OBD_NAME) {
304 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
305 RETURN(ERR_PTR(-EINVAL));
308 type = class_get_type(type_name);
310 CERROR("OBD: unknown type: %s\n", type_name);
311 RETURN(ERR_PTR(-ENODEV));
314 newdev = obd_device_alloc();
316 GOTO(out_type, result = ERR_PTR(-ENOMEM));
318 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
320 write_lock(&obd_dev_lock);
321 for (i = 0; i < class_devno_max(); i++) {
322 struct obd_device *obd = class_num2obd(i);
324 if (obd && (strcmp(name, obd->obd_name) == 0)) {
325 CERROR("Device %s already exists at %d, won't add\n",
328 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
329 "%p obd_magic %08x != %08x\n", result,
330 result->obd_magic, OBD_DEVICE_MAGIC);
331 LASSERTF(result->obd_minor == new_obd_minor,
332 "%p obd_minor %d != %d\n", result,
333 result->obd_minor, new_obd_minor);
335 obd_devs[result->obd_minor] = NULL;
336 result->obd_name[0]='\0';
338 result = ERR_PTR(-EEXIST);
341 if (!result && !obd) {
343 result->obd_minor = i;
345 result->obd_type = type;
346 strncpy(result->obd_name, name,
347 sizeof(result->obd_name) - 1);
348 obd_devs[i] = result;
351 write_unlock(&obd_dev_lock);
353 if (result == NULL && i >= class_devno_max()) {
354 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
356 GOTO(out, result = ERR_PTR(-EOVERFLOW));
362 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
363 result->obd_name, result);
367 obd_device_free(newdev);
369 class_put_type(type);
373 void class_release_dev(struct obd_device *obd)
375 struct obd_type *obd_type = obd->obd_type;
377 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
378 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
379 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
380 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
381 LASSERT(obd_type != NULL);
383 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
384 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
386 write_lock(&obd_dev_lock);
387 obd_devs[obd->obd_minor] = NULL;
388 write_unlock(&obd_dev_lock);
389 obd_device_free(obd);
391 class_put_type(obd_type);
394 int class_name2dev(const char *name)
401 read_lock(&obd_dev_lock);
402 for (i = 0; i < class_devno_max(); i++) {
403 struct obd_device *obd = class_num2obd(i);
405 if (obd && strcmp(name, obd->obd_name) == 0) {
406 /* Make sure we finished attaching before we give
407 out any references */
408 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409 if (obd->obd_attached) {
410 read_unlock(&obd_dev_lock);
416 read_unlock(&obd_dev_lock);
421 struct obd_device *class_name2obd(const char *name)
423 int dev = class_name2dev(name);
425 if (dev < 0 || dev > class_devno_max())
427 return class_num2obd(dev);
429 EXPORT_SYMBOL(class_name2obd);
431 int class_uuid2dev(struct obd_uuid *uuid)
435 read_lock(&obd_dev_lock);
436 for (i = 0; i < class_devno_max(); i++) {
437 struct obd_device *obd = class_num2obd(i);
439 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
440 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
441 read_unlock(&obd_dev_lock);
445 read_unlock(&obd_dev_lock);
450 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
452 int dev = class_uuid2dev(uuid);
455 return class_num2obd(dev);
457 EXPORT_SYMBOL(class_uuid2obd);
460 * Get obd device from ::obd_devs[]
462 * \param num [in] array index
464 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
465 * otherwise return the obd device there.
467 struct obd_device *class_num2obd(int num)
469 struct obd_device *obd = NULL;
471 if (num < class_devno_max()) {
476 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
477 "%p obd_magic %08x != %08x\n",
478 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
479 LASSERTF(obd->obd_minor == num,
480 "%p obd_minor %0d != %0d\n",
481 obd, obd->obd_minor, num);
488 * Get obd devices count. Device in any
490 * \retval obd device count
492 int get_devices_count(void)
494 int index, max_index = class_devno_max(), dev_count = 0;
496 read_lock(&obd_dev_lock);
497 for (index = 0; index <= max_index; index++) {
498 struct obd_device *obd = class_num2obd(index);
502 read_unlock(&obd_dev_lock);
506 EXPORT_SYMBOL(get_devices_count);
508 void class_obd_list(void)
513 read_lock(&obd_dev_lock);
514 for (i = 0; i < class_devno_max(); i++) {
515 struct obd_device *obd = class_num2obd(i);
519 if (obd->obd_stopping)
521 else if (obd->obd_set_up)
523 else if (obd->obd_attached)
527 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
528 i, status, obd->obd_type->typ_name,
529 obd->obd_name, obd->obd_uuid.uuid,
530 atomic_read(&obd->obd_refcount));
532 read_unlock(&obd_dev_lock);
536 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
537 specified, then only the client with that uuid is returned,
538 otherwise any client connected to the tgt is returned. */
539 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
540 const char * typ_name,
541 struct obd_uuid *grp_uuid)
545 read_lock(&obd_dev_lock);
546 for (i = 0; i < class_devno_max(); i++) {
547 struct obd_device *obd = class_num2obd(i);
551 if ((strncmp(obd->obd_type->typ_name, typ_name,
552 strlen(typ_name)) == 0)) {
553 if (obd_uuid_equals(tgt_uuid,
554 &obd->u.cli.cl_target_uuid) &&
555 ((grp_uuid)? obd_uuid_equals(grp_uuid,
556 &obd->obd_uuid) : 1)) {
557 read_unlock(&obd_dev_lock);
562 read_unlock(&obd_dev_lock);
566 EXPORT_SYMBOL(class_find_client_obd);
568 /* Iterate the obd_device list looking devices have grp_uuid. Start
569 searching at *next, and if a device is found, the next index to look
570 at is saved in *next. If next is NULL, then the first matching device
571 will always be returned. */
572 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
578 else if (*next >= 0 && *next < class_devno_max())
583 read_lock(&obd_dev_lock);
584 for (; i < class_devno_max(); i++) {
585 struct obd_device *obd = class_num2obd(i);
589 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
592 read_unlock(&obd_dev_lock);
596 read_unlock(&obd_dev_lock);
600 EXPORT_SYMBOL(class_devices_in_group);
603 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
604 * adjust sptlrpc settings accordingly.
606 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
608 struct obd_device *obd;
612 LASSERT(namelen > 0);
614 read_lock(&obd_dev_lock);
615 for (i = 0; i < class_devno_max(); i++) {
616 obd = class_num2obd(i);
618 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
621 /* only notify mdc, osc, mdt, ost */
622 type = obd->obd_type->typ_name;
623 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
624 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
625 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
626 strcmp(type, LUSTRE_OST_NAME) != 0)
629 if (strncmp(obd->obd_name, fsname, namelen))
632 class_incref(obd, __FUNCTION__, obd);
633 read_unlock(&obd_dev_lock);
634 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
635 sizeof(KEY_SPTLRPC_CONF),
636 KEY_SPTLRPC_CONF, 0, NULL, NULL);
638 class_decref(obd, __FUNCTION__, obd);
639 read_lock(&obd_dev_lock);
641 read_unlock(&obd_dev_lock);
644 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
646 void obd_cleanup_caches(void)
649 if (obd_device_cachep) {
650 kmem_cache_destroy(obd_device_cachep);
651 obd_device_cachep = NULL;
654 kmem_cache_destroy(obdo_cachep);
658 kmem_cache_destroy(import_cachep);
659 import_cachep = NULL;
662 kmem_cache_destroy(capa_cachep);
668 int obd_init_caches(void)
673 LASSERT(obd_device_cachep == NULL);
674 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
675 sizeof(struct obd_device),
677 if (!obd_device_cachep)
678 GOTO(out, rc = -ENOMEM);
680 LASSERT(obdo_cachep == NULL);
681 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
684 GOTO(out, rc = -ENOMEM);
686 LASSERT(import_cachep == NULL);
687 import_cachep = kmem_cache_create("ll_import_cache",
688 sizeof(struct obd_import),
691 GOTO(out, rc = -ENOMEM);
693 LASSERT(capa_cachep == NULL);
694 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
697 GOTO(out, rc = -ENOMEM);
701 obd_cleanup_caches();
705 /* map connection to client */
706 struct obd_export *class_conn2export(struct lustre_handle *conn)
708 struct obd_export *export;
712 CDEBUG(D_CACHE, "looking for null handle\n");
716 if (conn->cookie == -1) { /* this means assign a new connection */
717 CDEBUG(D_CACHE, "want a new connection\n");
721 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
722 export = class_handle2object(conn->cookie, NULL);
725 EXPORT_SYMBOL(class_conn2export);
727 struct obd_device *class_exp2obd(struct obd_export *exp)
733 EXPORT_SYMBOL(class_exp2obd);
735 struct obd_device *class_conn2obd(struct lustre_handle *conn)
737 struct obd_export *export;
738 export = class_conn2export(conn);
740 struct obd_device *obd = export->exp_obd;
741 class_export_put(export);
747 struct obd_import *class_exp2cliimp(struct obd_export *exp)
749 struct obd_device *obd = exp->exp_obd;
752 return obd->u.cli.cl_import;
754 EXPORT_SYMBOL(class_exp2cliimp);
756 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
758 struct obd_device *obd = class_conn2obd(conn);
761 return obd->u.cli.cl_import;
764 /* Export management functions */
765 static void class_export_destroy(struct obd_export *exp)
767 struct obd_device *obd = exp->exp_obd;
770 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
771 LASSERT(obd != NULL);
773 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
774 exp->exp_client_uuid.uuid, obd->obd_name);
776 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
777 if (exp->exp_connection)
778 ptlrpc_put_connection_superhack(exp->exp_connection);
780 LASSERT(list_empty(&exp->exp_outstanding_replies));
781 LASSERT(list_empty(&exp->exp_uncommitted_replies));
782 LASSERT(list_empty(&exp->exp_req_replay_queue));
783 LASSERT(list_empty(&exp->exp_hp_rpcs));
784 obd_destroy_export(exp);
785 class_decref(obd, "export", exp);
787 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
791 static void export_handle_addref(void *export)
793 class_export_get(export);
796 static struct portals_handle_ops export_handle_ops = {
797 .hop_addref = export_handle_addref,
801 struct obd_export *class_export_get(struct obd_export *exp)
803 atomic_inc(&exp->exp_refcount);
804 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
805 atomic_read(&exp->exp_refcount));
808 EXPORT_SYMBOL(class_export_get);
810 void class_export_put(struct obd_export *exp)
812 LASSERT(exp != NULL);
813 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
814 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
815 atomic_read(&exp->exp_refcount) - 1);
817 if (atomic_dec_and_test(&exp->exp_refcount)) {
818 LASSERT(!list_empty(&exp->exp_obd_chain));
819 CDEBUG(D_IOCTL, "final put %p/%s\n",
820 exp, exp->exp_client_uuid.uuid);
822 /* release nid stat refererence */
823 lprocfs_exp_cleanup(exp);
825 obd_zombie_export_add(exp);
828 EXPORT_SYMBOL(class_export_put);
830 /* Creates a new export, adds it to the hash table, and returns a
831 * pointer to it. The refcount is 2: one for the hash reference, and
832 * one for the pointer returned by this function. */
833 struct obd_export *class_new_export(struct obd_device *obd,
834 struct obd_uuid *cluuid)
836 struct obd_export *export;
837 cfs_hash_t *hash = NULL;
841 OBD_ALLOC_PTR(export);
843 return ERR_PTR(-ENOMEM);
845 export->exp_conn_cnt = 0;
846 export->exp_lock_hash = NULL;
847 export->exp_flock_hash = NULL;
848 atomic_set(&export->exp_refcount, 2);
849 atomic_set(&export->exp_rpc_count, 0);
850 atomic_set(&export->exp_cb_count, 0);
851 atomic_set(&export->exp_locks_count, 0);
852 #if LUSTRE_TRACKS_LOCK_EXP_REFS
853 INIT_LIST_HEAD(&export->exp_locks_list);
854 spin_lock_init(&export->exp_locks_list_guard);
856 atomic_set(&export->exp_replay_count, 0);
857 export->exp_obd = obd;
858 INIT_LIST_HEAD(&export->exp_outstanding_replies);
859 spin_lock_init(&export->exp_uncommitted_replies_lock);
860 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
861 INIT_LIST_HEAD(&export->exp_req_replay_queue);
862 INIT_LIST_HEAD(&export->exp_handle.h_link);
863 INIT_LIST_HEAD(&export->exp_hp_rpcs);
864 INIT_LIST_HEAD(&export->exp_reg_rpcs);
865 class_handle_hash(&export->exp_handle, &export_handle_ops);
866 export->exp_last_request_time = cfs_time_current_sec();
867 spin_lock_init(&export->exp_lock);
868 spin_lock_init(&export->exp_rpc_lock);
869 INIT_HLIST_NODE(&export->exp_uuid_hash);
870 INIT_HLIST_NODE(&export->exp_nid_hash);
871 spin_lock_init(&export->exp_bl_list_lock);
872 INIT_LIST_HEAD(&export->exp_bl_list);
874 export->exp_sp_peer = LUSTRE_SP_ANY;
875 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
876 export->exp_client_uuid = *cluuid;
877 obd_init_export(export);
879 spin_lock(&obd->obd_dev_lock);
880 /* shouldn't happen, but might race */
881 if (obd->obd_stopping)
882 GOTO(exit_unlock, rc = -ENODEV);
884 hash = cfs_hash_getref(obd->obd_uuid_hash);
886 GOTO(exit_unlock, rc = -ENODEV);
887 spin_unlock(&obd->obd_dev_lock);
889 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
890 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
892 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
893 obd->obd_name, cluuid->uuid, rc);
894 GOTO(exit_err, rc = -EALREADY);
898 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
899 spin_lock(&obd->obd_dev_lock);
900 if (obd->obd_stopping) {
901 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
902 GOTO(exit_unlock, rc = -ENODEV);
905 class_incref(obd, "export", export);
906 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
907 list_add_tail(&export->exp_obd_chain_timed,
908 &export->exp_obd->obd_exports_timed);
909 export->exp_obd->obd_num_exports++;
910 spin_unlock(&obd->obd_dev_lock);
911 cfs_hash_putref(hash);
915 spin_unlock(&obd->obd_dev_lock);
918 cfs_hash_putref(hash);
919 class_handle_unhash(&export->exp_handle);
920 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
921 obd_destroy_export(export);
922 OBD_FREE_PTR(export);
925 EXPORT_SYMBOL(class_new_export);
927 void class_unlink_export(struct obd_export *exp)
929 class_handle_unhash(&exp->exp_handle);
931 spin_lock(&exp->exp_obd->obd_dev_lock);
932 /* delete an uuid-export hashitem from hashtables */
933 if (!hlist_unhashed(&exp->exp_uuid_hash))
934 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
935 &exp->exp_client_uuid,
936 &exp->exp_uuid_hash);
938 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
939 list_del_init(&exp->exp_obd_chain_timed);
940 exp->exp_obd->obd_num_exports--;
941 spin_unlock(&exp->exp_obd->obd_dev_lock);
942 class_export_put(exp);
945 /* Import management functions */
946 static void class_import_destroy(struct obd_import *imp)
950 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
951 imp->imp_obd->obd_name);
953 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
955 ptlrpc_put_connection_superhack(imp->imp_connection);
957 while (!list_empty(&imp->imp_conn_list)) {
958 struct obd_import_conn *imp_conn;
960 imp_conn = list_entry(imp->imp_conn_list.next,
961 struct obd_import_conn, oic_item);
962 list_del_init(&imp_conn->oic_item);
963 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
964 OBD_FREE(imp_conn, sizeof(*imp_conn));
967 LASSERT(imp->imp_sec == NULL);
968 class_decref(imp->imp_obd, "import", imp);
969 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
973 static void import_handle_addref(void *import)
975 class_import_get(import);
978 static struct portals_handle_ops import_handle_ops = {
979 .hop_addref = import_handle_addref,
983 struct obd_import *class_import_get(struct obd_import *import)
985 atomic_inc(&import->imp_refcount);
986 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
987 atomic_read(&import->imp_refcount),
988 import->imp_obd->obd_name);
991 EXPORT_SYMBOL(class_import_get);
993 void class_import_put(struct obd_import *imp)
997 LASSERT(list_empty(&imp->imp_zombie_chain));
998 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1000 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1001 atomic_read(&imp->imp_refcount) - 1,
1002 imp->imp_obd->obd_name);
1004 if (atomic_dec_and_test(&imp->imp_refcount)) {
1005 CDEBUG(D_INFO, "final put import %p\n", imp);
1006 obd_zombie_import_add(imp);
1009 /* catch possible import put race */
1010 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1013 EXPORT_SYMBOL(class_import_put);
1015 static void init_imp_at(struct imp_at *at) {
1017 at_init(&at->iat_net_latency, 0, 0);
1018 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1019 /* max service estimates are tracked on the server side, so
1020 don't use the AT history here, just use the last reported
1021 val. (But keep hist for proc histogram, worst_ever) */
1022 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1027 struct obd_import *class_new_import(struct obd_device *obd)
1029 struct obd_import *imp;
1031 OBD_ALLOC(imp, sizeof(*imp));
1035 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1036 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1037 INIT_LIST_HEAD(&imp->imp_replay_list);
1038 INIT_LIST_HEAD(&imp->imp_sending_list);
1039 INIT_LIST_HEAD(&imp->imp_delayed_list);
1040 INIT_LIST_HEAD(&imp->imp_committed_list);
1041 imp->imp_replay_cursor = &imp->imp_committed_list;
1042 spin_lock_init(&imp->imp_lock);
1043 imp->imp_last_success_conn = 0;
1044 imp->imp_state = LUSTRE_IMP_NEW;
1045 imp->imp_obd = class_incref(obd, "import", imp);
1046 mutex_init(&imp->imp_sec_mutex);
1047 init_waitqueue_head(&imp->imp_recovery_waitq);
1049 atomic_set(&imp->imp_refcount, 2);
1050 atomic_set(&imp->imp_unregistering, 0);
1051 atomic_set(&imp->imp_inflight, 0);
1052 atomic_set(&imp->imp_replay_inflight, 0);
1053 atomic_set(&imp->imp_inval_count, 0);
1054 INIT_LIST_HEAD(&imp->imp_conn_list);
1055 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1056 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1057 init_imp_at(&imp->imp_at);
1059 /* the default magic is V2, will be used in connect RPC, and
1060 * then adjusted according to the flags in request/reply. */
1061 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1065 EXPORT_SYMBOL(class_new_import);
1067 void class_destroy_import(struct obd_import *import)
1069 LASSERT(import != NULL);
1070 LASSERT(import != LP_POISON);
1072 class_handle_unhash(&import->imp_handle);
1074 spin_lock(&import->imp_lock);
1075 import->imp_generation++;
1076 spin_unlock(&import->imp_lock);
1077 class_import_put(import);
1079 EXPORT_SYMBOL(class_destroy_import);
1081 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1083 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1085 spin_lock(&exp->exp_locks_list_guard);
1087 LASSERT(lock->l_exp_refs_nr >= 0);
1089 if (lock->l_exp_refs_target != NULL &&
1090 lock->l_exp_refs_target != exp) {
1091 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1092 exp, lock, lock->l_exp_refs_target);
1094 if ((lock->l_exp_refs_nr ++) == 0) {
1095 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1096 lock->l_exp_refs_target = exp;
1098 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1099 lock, exp, lock->l_exp_refs_nr);
1100 spin_unlock(&exp->exp_locks_list_guard);
1103 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1105 spin_lock(&exp->exp_locks_list_guard);
1106 LASSERT(lock->l_exp_refs_nr > 0);
1107 if (lock->l_exp_refs_target != exp) {
1108 LCONSOLE_WARN("lock %p, "
1109 "mismatching export pointers: %p, %p\n",
1110 lock, lock->l_exp_refs_target, exp);
1112 if (-- lock->l_exp_refs_nr == 0) {
1113 list_del_init(&lock->l_exp_refs_link);
1114 lock->l_exp_refs_target = NULL;
1116 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1117 lock, exp, lock->l_exp_refs_nr);
1118 spin_unlock(&exp->exp_locks_list_guard);
1122 /* A connection defines an export context in which preallocation can
1123 be managed. This releases the export pointer reference, and returns
1124 the export handle, so the export refcount is 1 when this function
1126 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1127 struct obd_uuid *cluuid)
1129 struct obd_export *export;
1130 LASSERT(conn != NULL);
1131 LASSERT(obd != NULL);
1132 LASSERT(cluuid != NULL);
1135 export = class_new_export(obd, cluuid);
1137 RETURN(PTR_ERR(export));
1139 conn->cookie = export->exp_handle.h_cookie;
1140 class_export_put(export);
1142 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1143 cluuid->uuid, conn->cookie);
1146 EXPORT_SYMBOL(class_connect);
1148 /* if export is involved in recovery then clean up related things */
1149 static void class_export_recovery_cleanup(struct obd_export *exp)
1151 struct obd_device *obd = exp->exp_obd;
1153 spin_lock(&obd->obd_recovery_task_lock);
1154 if (obd->obd_recovering) {
1155 if (exp->exp_in_recovery) {
1156 spin_lock(&exp->exp_lock);
1157 exp->exp_in_recovery = 0;
1158 spin_unlock(&exp->exp_lock);
1159 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1160 atomic_dec(&obd->obd_connected_clients);
1163 /* if called during recovery then should update
1164 * obd_stale_clients counter,
1165 * lightweight exports are not counted */
1166 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1167 exp->exp_obd->obd_stale_clients++;
1169 spin_unlock(&obd->obd_recovery_task_lock);
1171 spin_lock(&exp->exp_lock);
1172 /** Cleanup req replay fields */
1173 if (exp->exp_req_replay_needed) {
1174 exp->exp_req_replay_needed = 0;
1176 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1177 atomic_dec(&obd->obd_req_replay_clients);
1180 /** Cleanup lock replay data */
1181 if (exp->exp_lock_replay_needed) {
1182 exp->exp_lock_replay_needed = 0;
1184 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1185 atomic_dec(&obd->obd_lock_replay_clients);
1187 spin_unlock(&exp->exp_lock);
1190 /* This function removes 1-3 references from the export:
1191 * 1 - for export pointer passed
1192 * and if disconnect really need
1193 * 2 - removing from hash
1194 * 3 - in client_unlink_export
1195 * The export pointer passed to this function can destroyed */
1196 int class_disconnect(struct obd_export *export)
1198 int already_disconnected;
1201 if (export == NULL) {
1202 CWARN("attempting to free NULL export %p\n", export);
1206 spin_lock(&export->exp_lock);
1207 already_disconnected = export->exp_disconnected;
1208 export->exp_disconnected = 1;
1209 spin_unlock(&export->exp_lock);
1211 /* class_cleanup(), abort_recovery(), and class_fail_export()
1212 * all end up in here, and if any of them race we shouldn't
1213 * call extra class_export_puts(). */
1214 if (already_disconnected) {
1215 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1216 GOTO(no_disconn, already_disconnected);
1219 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1220 export->exp_handle.h_cookie);
1222 if (!hlist_unhashed(&export->exp_nid_hash))
1223 cfs_hash_del(export->exp_obd->obd_nid_hash,
1224 &export->exp_connection->c_peer.nid,
1225 &export->exp_nid_hash);
1227 class_export_recovery_cleanup(export);
1228 class_unlink_export(export);
1230 class_export_put(export);
1233 EXPORT_SYMBOL(class_disconnect);
1235 /* Return non-zero for a fully connected export */
1236 int class_connected_export(struct obd_export *exp)
1241 spin_lock(&exp->exp_lock);
1242 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1243 spin_unlock(&exp->exp_lock);
1247 EXPORT_SYMBOL(class_connected_export);
1249 static void class_disconnect_export_list(struct list_head *list,
1250 enum obd_option flags)
1253 struct obd_export *exp;
1256 /* It's possible that an export may disconnect itself, but
1257 * nothing else will be added to this list. */
1258 while (!list_empty(list)) {
1259 exp = list_entry(list->next, struct obd_export,
1261 /* need for safe call CDEBUG after obd_disconnect */
1262 class_export_get(exp);
1264 spin_lock(&exp->exp_lock);
1265 exp->exp_flags = flags;
1266 spin_unlock(&exp->exp_lock);
1268 if (obd_uuid_equals(&exp->exp_client_uuid,
1269 &exp->exp_obd->obd_uuid)) {
1271 "exp %p export uuid == obd uuid, don't discon\n",
1273 /* Need to delete this now so we don't end up pointing
1274 * to work_list later when this export is cleaned up. */
1275 list_del_init(&exp->exp_obd_chain);
1276 class_export_put(exp);
1280 class_export_get(exp);
1281 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1282 "last request at "CFS_TIME_T"\n",
1283 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1284 exp, exp->exp_last_request_time);
1285 /* release one export reference anyway */
1286 rc = obd_disconnect(exp);
1288 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1289 obd_export_nid2str(exp), exp, rc);
1290 class_export_put(exp);
1295 void class_disconnect_exports(struct obd_device *obd)
1297 struct list_head work_list;
1300 /* Move all of the exports from obd_exports to a work list, en masse. */
1301 INIT_LIST_HEAD(&work_list);
1302 spin_lock(&obd->obd_dev_lock);
1303 list_splice_init(&obd->obd_exports, &work_list);
1304 list_splice_init(&obd->obd_delayed_exports, &work_list);
1305 spin_unlock(&obd->obd_dev_lock);
1307 if (!list_empty(&work_list)) {
1308 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1309 "disconnecting them\n", obd->obd_minor, obd);
1310 class_disconnect_export_list(&work_list,
1311 exp_flags_from_obd(obd));
1313 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1314 obd->obd_minor, obd);
1317 EXPORT_SYMBOL(class_disconnect_exports);
1319 /* Remove exports that have not completed recovery.
1321 void class_disconnect_stale_exports(struct obd_device *obd,
1322 int (*test_export)(struct obd_export *))
1324 struct list_head work_list;
1325 struct obd_export *exp, *n;
1329 INIT_LIST_HEAD(&work_list);
1330 spin_lock(&obd->obd_dev_lock);
1331 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1333 /* don't count self-export as client */
1334 if (obd_uuid_equals(&exp->exp_client_uuid,
1335 &exp->exp_obd->obd_uuid))
1338 /* don't evict clients which have no slot in last_rcvd
1339 * (e.g. lightweight connection) */
1340 if (exp->exp_target_data.ted_lr_idx == -1)
1343 spin_lock(&exp->exp_lock);
1344 if (exp->exp_failed || test_export(exp)) {
1345 spin_unlock(&exp->exp_lock);
1348 exp->exp_failed = 1;
1349 spin_unlock(&exp->exp_lock);
1351 list_move(&exp->exp_obd_chain, &work_list);
1353 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1354 obd->obd_name, exp->exp_client_uuid.uuid,
1355 exp->exp_connection == NULL ? "<unknown>" :
1356 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1357 print_export_data(exp, "EVICTING", 0);
1359 spin_unlock(&obd->obd_dev_lock);
1362 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1363 obd->obd_name, evicted);
1365 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1366 OBD_OPT_ABORT_RECOV);
1369 EXPORT_SYMBOL(class_disconnect_stale_exports);
1371 void class_fail_export(struct obd_export *exp)
1373 int rc, already_failed;
1375 spin_lock(&exp->exp_lock);
1376 already_failed = exp->exp_failed;
1377 exp->exp_failed = 1;
1378 spin_unlock(&exp->exp_lock);
1380 if (already_failed) {
1381 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1382 exp, exp->exp_client_uuid.uuid);
1386 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1387 exp, exp->exp_client_uuid.uuid);
1389 if (obd_dump_on_timeout)
1390 libcfs_debug_dumplog();
1392 /* need for safe call CDEBUG after obd_disconnect */
1393 class_export_get(exp);
1395 /* Most callers into obd_disconnect are removing their own reference
1396 * (request, for example) in addition to the one from the hash table.
1397 * We don't have such a reference here, so make one. */
1398 class_export_get(exp);
1399 rc = obd_disconnect(exp);
1401 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1403 CDEBUG(D_HA, "disconnected export %p/%s\n",
1404 exp, exp->exp_client_uuid.uuid);
1405 class_export_put(exp);
1407 EXPORT_SYMBOL(class_fail_export);
1409 char *obd_export_nid2str(struct obd_export *exp)
1411 if (exp->exp_connection != NULL)
1412 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1416 EXPORT_SYMBOL(obd_export_nid2str);
1418 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1420 cfs_hash_t *nid_hash;
1421 struct obd_export *doomed_exp = NULL;
1422 int exports_evicted = 0;
1424 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1426 spin_lock(&obd->obd_dev_lock);
1427 /* umount has run already, so evict thread should leave
1428 * its task to umount thread now */
1429 if (obd->obd_stopping) {
1430 spin_unlock(&obd->obd_dev_lock);
1431 return exports_evicted;
1433 nid_hash = obd->obd_nid_hash;
1434 cfs_hash_getref(nid_hash);
1435 spin_unlock(&obd->obd_dev_lock);
1438 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1439 if (doomed_exp == NULL)
1442 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1443 "nid %s found, wanted nid %s, requested nid %s\n",
1444 obd_export_nid2str(doomed_exp),
1445 libcfs_nid2str(nid_key), nid);
1446 LASSERTF(doomed_exp != obd->obd_self_export,
1447 "self-export is hashed by NID?\n");
1449 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1450 "request\n", obd->obd_name,
1451 obd_uuid2str(&doomed_exp->exp_client_uuid),
1452 obd_export_nid2str(doomed_exp));
1453 class_fail_export(doomed_exp);
1454 class_export_put(doomed_exp);
1457 cfs_hash_putref(nid_hash);
1459 if (!exports_evicted)
1460 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1461 obd->obd_name, nid);
1462 return exports_evicted;
1464 EXPORT_SYMBOL(obd_export_evict_by_nid);
1466 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1468 cfs_hash_t *uuid_hash;
1469 struct obd_export *doomed_exp = NULL;
1470 struct obd_uuid doomed_uuid;
1471 int exports_evicted = 0;
1473 spin_lock(&obd->obd_dev_lock);
1474 if (obd->obd_stopping) {
1475 spin_unlock(&obd->obd_dev_lock);
1476 return exports_evicted;
1478 uuid_hash = obd->obd_uuid_hash;
1479 cfs_hash_getref(uuid_hash);
1480 spin_unlock(&obd->obd_dev_lock);
1482 obd_str2uuid(&doomed_uuid, uuid);
1483 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1484 CERROR("%s: can't evict myself\n", obd->obd_name);
1485 cfs_hash_putref(uuid_hash);
1486 return exports_evicted;
1489 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1491 if (doomed_exp == NULL) {
1492 CERROR("%s: can't disconnect %s: no exports found\n",
1493 obd->obd_name, uuid);
1495 CWARN("%s: evicting %s at adminstrative request\n",
1496 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1497 class_fail_export(doomed_exp);
1498 class_export_put(doomed_exp);
1501 cfs_hash_putref(uuid_hash);
1503 return exports_evicted;
1506 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1507 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1510 static void print_export_data(struct obd_export *exp, const char *status,
1513 struct ptlrpc_reply_state *rs;
1514 struct ptlrpc_reply_state *first_reply = NULL;
1517 spin_lock(&exp->exp_lock);
1518 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1524 spin_unlock(&exp->exp_lock);
1526 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1527 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1528 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1529 atomic_read(&exp->exp_rpc_count),
1530 atomic_read(&exp->exp_cb_count),
1531 atomic_read(&exp->exp_locks_count),
1532 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1533 nreplies, first_reply, nreplies > 3 ? "..." : "",
1534 exp->exp_last_committed);
1535 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1536 if (locks && class_export_dump_hook != NULL)
1537 class_export_dump_hook(exp);
1541 void dump_exports(struct obd_device *obd, int locks)
1543 struct obd_export *exp;
1545 spin_lock(&obd->obd_dev_lock);
1546 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1547 print_export_data(exp, "ACTIVE", locks);
1548 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1549 print_export_data(exp, "UNLINKED", locks);
1550 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1551 print_export_data(exp, "DELAYED", locks);
1552 spin_unlock(&obd->obd_dev_lock);
1553 spin_lock(&obd_zombie_impexp_lock);
1554 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1555 print_export_data(exp, "ZOMBIE", locks);
1556 spin_unlock(&obd_zombie_impexp_lock);
1559 void obd_exports_barrier(struct obd_device *obd)
1562 LASSERT(list_empty(&obd->obd_exports));
1563 spin_lock(&obd->obd_dev_lock);
1564 while (!list_empty(&obd->obd_unlinked_exports)) {
1565 spin_unlock(&obd->obd_dev_lock);
1566 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1567 cfs_time_seconds(waited));
1568 if (waited > 5 && IS_PO2(waited)) {
1569 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1570 "more than %d seconds. "
1571 "The obd refcount = %d. Is it stuck?\n",
1572 obd->obd_name, waited,
1573 atomic_read(&obd->obd_refcount));
1574 dump_exports(obd, 1);
1577 spin_lock(&obd->obd_dev_lock);
1579 spin_unlock(&obd->obd_dev_lock);
1581 EXPORT_SYMBOL(obd_exports_barrier);
1583 /* Total amount of zombies to be destroyed */
1584 static int zombies_count = 0;
1587 * kill zombie imports and exports
1589 void obd_zombie_impexp_cull(void)
1591 struct obd_import *import;
1592 struct obd_export *export;
1596 spin_lock(&obd_zombie_impexp_lock);
1599 if (!list_empty(&obd_zombie_imports)) {
1600 import = list_entry(obd_zombie_imports.next,
1603 list_del_init(&import->imp_zombie_chain);
1607 if (!list_empty(&obd_zombie_exports)) {
1608 export = list_entry(obd_zombie_exports.next,
1611 list_del_init(&export->exp_obd_chain);
1614 spin_unlock(&obd_zombie_impexp_lock);
1616 if (import != NULL) {
1617 class_import_destroy(import);
1618 spin_lock(&obd_zombie_impexp_lock);
1620 spin_unlock(&obd_zombie_impexp_lock);
1623 if (export != NULL) {
1624 class_export_destroy(export);
1625 spin_lock(&obd_zombie_impexp_lock);
1627 spin_unlock(&obd_zombie_impexp_lock);
1631 } while (import != NULL || export != NULL);
1635 static struct completion obd_zombie_start;
1636 static struct completion obd_zombie_stop;
1637 static unsigned long obd_zombie_flags;
1638 static wait_queue_head_t obd_zombie_waitq;
1639 static pid_t obd_zombie_pid;
1642 OBD_ZOMBIE_STOP = 0x0001,
1646 * check for work for kill zombie import/export thread.
1648 static int obd_zombie_impexp_check(void *arg)
1652 spin_lock(&obd_zombie_impexp_lock);
1653 rc = (zombies_count == 0) &&
1654 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1655 spin_unlock(&obd_zombie_impexp_lock);
1661 * Add export to the obd_zombe thread and notify it.
1663 static void obd_zombie_export_add(struct obd_export *exp) {
1664 spin_lock(&exp->exp_obd->obd_dev_lock);
1665 LASSERT(!list_empty(&exp->exp_obd_chain));
1666 list_del_init(&exp->exp_obd_chain);
1667 spin_unlock(&exp->exp_obd->obd_dev_lock);
1668 spin_lock(&obd_zombie_impexp_lock);
1670 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1671 spin_unlock(&obd_zombie_impexp_lock);
1673 obd_zombie_impexp_notify();
1677 * Add import to the obd_zombe thread and notify it.
1679 static void obd_zombie_import_add(struct obd_import *imp) {
1680 LASSERT(imp->imp_sec == NULL);
1681 LASSERT(imp->imp_rq_pool == NULL);
1682 spin_lock(&obd_zombie_impexp_lock);
1683 LASSERT(list_empty(&imp->imp_zombie_chain));
1685 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1686 spin_unlock(&obd_zombie_impexp_lock);
1688 obd_zombie_impexp_notify();
1692 * notify import/export destroy thread about new zombie.
1694 static void obd_zombie_impexp_notify(void)
1697 * Make sure obd_zomebie_impexp_thread get this notification.
1698 * It is possible this signal only get by obd_zombie_barrier, and
1699 * barrier gulps this notification and sleeps away and hangs ensues
1701 wake_up_all(&obd_zombie_waitq);
1705 * check whether obd_zombie is idle
1707 static int obd_zombie_is_idle(void)
1711 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1712 spin_lock(&obd_zombie_impexp_lock);
1713 rc = (zombies_count == 0);
1714 spin_unlock(&obd_zombie_impexp_lock);
1719 * wait when obd_zombie import/export queues become empty
1721 void obd_zombie_barrier(void)
1723 struct l_wait_info lwi = { 0 };
1725 if (obd_zombie_pid == current_pid())
1726 /* don't wait for myself */
1728 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1730 EXPORT_SYMBOL(obd_zombie_barrier);
1734 * destroy zombie export/import thread.
1736 static int obd_zombie_impexp_thread(void *unused)
1738 unshare_fs_struct();
1739 complete(&obd_zombie_start);
1741 obd_zombie_pid = current_pid();
1743 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1744 struct l_wait_info lwi = { 0 };
1746 l_wait_event(obd_zombie_waitq,
1747 !obd_zombie_impexp_check(NULL), &lwi);
1748 obd_zombie_impexp_cull();
1751 * Notify obd_zombie_barrier callers that queues
1754 wake_up(&obd_zombie_waitq);
1757 complete(&obd_zombie_stop);
1764 * start destroy zombie import/export thread
1766 int obd_zombie_impexp_init(void)
1768 struct task_struct *task;
1770 INIT_LIST_HEAD(&obd_zombie_imports);
1772 INIT_LIST_HEAD(&obd_zombie_exports);
1773 spin_lock_init(&obd_zombie_impexp_lock);
1774 init_completion(&obd_zombie_start);
1775 init_completion(&obd_zombie_stop);
1776 init_waitqueue_head(&obd_zombie_waitq);
1779 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1781 RETURN(PTR_ERR(task));
1783 wait_for_completion(&obd_zombie_start);
1787 * stop destroy zombie import/export thread
1789 void obd_zombie_impexp_stop(void)
1791 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1792 obd_zombie_impexp_notify();
1793 wait_for_completion(&obd_zombie_stop);
1796 /***** Kernel-userspace comm helpers *******/
1798 /* Get length of entire message, including header */
1799 int kuc_len(int payload_len)
1801 return sizeof(struct kuc_hdr) + payload_len;
1803 EXPORT_SYMBOL(kuc_len);
1805 /* Get a pointer to kuc header, given a ptr to the payload
1806 * @param p Pointer to payload area
1807 * @returns Pointer to kuc header
1809 struct kuc_hdr * kuc_ptr(void *p)
1811 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1812 LASSERT(lh->kuc_magic == KUC_MAGIC);
1815 EXPORT_SYMBOL(kuc_ptr);
1817 /* Test if payload is part of kuc message
1818 * @param p Pointer to payload area
1821 int kuc_ispayload(void *p)
1823 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1825 if (kh->kuc_magic == KUC_MAGIC)
1830 EXPORT_SYMBOL(kuc_ispayload);
1832 /* Alloc space for a message, and fill in header
1833 * @return Pointer to payload area
1835 void *kuc_alloc(int payload_len, int transport, int type)
1838 int len = kuc_len(payload_len);
1842 return ERR_PTR(-ENOMEM);
1844 lh->kuc_magic = KUC_MAGIC;
1845 lh->kuc_transport = transport;
1846 lh->kuc_msgtype = type;
1847 lh->kuc_msglen = len;
1849 return (void *)(lh + 1);
1851 EXPORT_SYMBOL(kuc_alloc);
1853 /* Takes pointer to payload area */
1854 inline void kuc_free(void *p, int payload_len)
1856 struct kuc_hdr *lh = kuc_ptr(p);
1857 OBD_FREE(lh, kuc_len(payload_len));
1859 EXPORT_SYMBOL(kuc_free);
1861 struct obd_request_slot_waiter {
1862 struct list_head orsw_entry;
1863 wait_queue_head_t orsw_waitq;
1867 static bool obd_request_slot_avail(struct client_obd *cli,
1868 struct obd_request_slot_waiter *orsw)
1872 spin_lock(&cli->cl_loi_list_lock);
1873 avail = !!list_empty(&orsw->orsw_entry);
1874 spin_unlock(&cli->cl_loi_list_lock);
1880 * For network flow control, the RPC sponsor needs to acquire a credit
1881 * before sending the RPC. The credits count for a connection is defined
1882 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1883 * the subsequent RPC sponsors need to wait until others released their
1884 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1886 int obd_get_request_slot(struct client_obd *cli)
1888 struct obd_request_slot_waiter orsw;
1889 struct l_wait_info lwi;
1892 spin_lock(&cli->cl_loi_list_lock);
1893 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1894 cli->cl_r_in_flight++;
1895 spin_unlock(&cli->cl_loi_list_lock);
1899 init_waitqueue_head(&orsw.orsw_waitq);
1900 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1901 orsw.orsw_signaled = false;
1902 spin_unlock(&cli->cl_loi_list_lock);
1904 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1905 rc = l_wait_event(orsw.orsw_waitq,
1906 obd_request_slot_avail(cli, &orsw) ||
1910 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1911 * freed but other (such as obd_put_request_slot) is using it. */
1912 spin_lock(&cli->cl_loi_list_lock);
1914 if (!orsw.orsw_signaled) {
1915 if (list_empty(&orsw.orsw_entry))
1916 cli->cl_r_in_flight--;
1918 list_del(&orsw.orsw_entry);
1922 if (orsw.orsw_signaled) {
1923 LASSERT(list_empty(&orsw.orsw_entry));
1927 spin_unlock(&cli->cl_loi_list_lock);
1931 EXPORT_SYMBOL(obd_get_request_slot);
1933 void obd_put_request_slot(struct client_obd *cli)
1935 struct obd_request_slot_waiter *orsw;
1937 spin_lock(&cli->cl_loi_list_lock);
1938 cli->cl_r_in_flight--;
1940 /* If there is free slot, wakeup the first waiter. */
1941 if (!list_empty(&cli->cl_loi_read_list) &&
1942 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1943 orsw = list_entry(cli->cl_loi_read_list.next,
1944 struct obd_request_slot_waiter, orsw_entry);
1945 list_del_init(&orsw->orsw_entry);
1946 cli->cl_r_in_flight++;
1947 wake_up(&orsw->orsw_waitq);
1949 spin_unlock(&cli->cl_loi_list_lock);
1951 EXPORT_SYMBOL(obd_put_request_slot);
1953 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1955 return cli->cl_max_rpcs_in_flight;
1957 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1959 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1961 struct obd_request_slot_waiter *orsw;
1966 if (max > OBD_MAX_RIF_MAX || max < 1)
1969 spin_lock(&cli->cl_loi_list_lock);
1970 old = cli->cl_max_rpcs_in_flight;
1971 cli->cl_max_rpcs_in_flight = max;
1974 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1975 for (i = 0; i < diff; i++) {
1976 if (list_empty(&cli->cl_loi_read_list))
1979 orsw = list_entry(cli->cl_loi_read_list.next,
1980 struct obd_request_slot_waiter, orsw_entry);
1981 list_del_init(&orsw->orsw_entry);
1982 cli->cl_r_in_flight++;
1983 wake_up(&orsw->orsw_waitq);
1985 spin_unlock(&cli->cl_loi_list_lock);
1989 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);