4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_kernelcomm.h>
49 spinlock_t obd_types_lock;
51 static struct kmem_cache *obd_device_cachep;
52 struct kmem_cache *obdo_cachep;
53 EXPORT_SYMBOL(obdo_cachep);
54 static struct kmem_cache *import_cachep;
56 static struct list_head obd_zombie_imports;
57 static struct list_head obd_zombie_exports;
58 static spinlock_t obd_zombie_impexp_lock;
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64 const char *status, int locks);
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
100 struct list_head *tmp;
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 list_for_each(tmp, &obd_types) {
105 type = list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
114 EXPORT_SYMBOL(class_search_type);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
122 const char *modname = name;
124 if (strcmp(modname, "obdfilter") == 0)
127 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128 modname = LUSTRE_OSP_NAME;
130 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131 modname = LUSTRE_MDT_NAME;
133 if (!request_module("%s", modname)) {
134 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135 type = class_search_type(name);
137 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143 spin_lock(&type->obd_type_lock);
145 try_module_get(type->typ_dt_ops->o_owner);
146 spin_unlock(&type->obd_type_lock);
151 void class_put_type(struct obd_type *type)
154 spin_lock(&type->obd_type_lock);
156 module_put(type->typ_dt_ops->o_owner);
157 spin_unlock(&type->obd_type_lock);
160 #define CLASS_MAX_NAME 1024
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163 bool enable_proc, struct lprocfs_vars *vars,
164 const char *name, struct lu_device_type *ldt)
166 struct obd_type *type;
171 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
173 if (class_search_type(name)) {
174 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
179 OBD_ALLOC(type, sizeof(*type));
183 OBD_ALLOC_PTR(type->typ_dt_ops);
184 OBD_ALLOC_PTR(type->typ_md_ops);
185 OBD_ALLOC(type->typ_name, strlen(name) + 1);
187 if (type->typ_dt_ops == NULL ||
188 type->typ_md_ops == NULL ||
189 type->typ_name == NULL)
192 *(type->typ_dt_ops) = *dt_ops;
193 /* md_ops is optional */
195 *(type->typ_md_ops) = *md_ops;
196 strcpy(type->typ_name, name);
197 spin_lock_init(&type->obd_type_lock);
199 #ifdef CONFIG_PROC_FS
201 type->typ_procroot = lprocfs_register(type->typ_name,
204 if (IS_ERR(type->typ_procroot)) {
205 rc = PTR_ERR(type->typ_procroot);
206 type->typ_procroot = NULL;
213 rc = lu_device_type_init(ldt);
218 spin_lock(&obd_types_lock);
219 list_add(&type->typ_chain, &obd_types);
220 spin_unlock(&obd_types_lock);
225 if (type->typ_name != NULL) {
226 #ifdef CONFIG_PROC_FS
227 if (type->typ_procroot != NULL)
228 remove_proc_subtree(type->typ_name, proc_lustre_root);
230 OBD_FREE(type->typ_name, strlen(name) + 1);
232 if (type->typ_md_ops != NULL)
233 OBD_FREE_PTR(type->typ_md_ops);
234 if (type->typ_dt_ops != NULL)
235 OBD_FREE_PTR(type->typ_dt_ops);
236 OBD_FREE(type, sizeof(*type));
239 EXPORT_SYMBOL(class_register_type);
241 int class_unregister_type(const char *name)
243 struct obd_type *type = class_search_type(name);
247 CERROR("unknown obd type\n");
251 if (type->typ_refcnt) {
252 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
253 /* This is a bad situation, let's make the best of it */
254 /* Remove ops, but leave the name for debugging */
255 OBD_FREE_PTR(type->typ_dt_ops);
256 OBD_FREE_PTR(type->typ_md_ops);
260 /* we do not use type->typ_procroot as for compatibility purposes
261 * other modules can share names (i.e. lod can use lov entry). so
262 * we can't reference pointer as it can get invalided when another
263 * module removes the entry */
264 #ifdef CONFIG_PROC_FS
265 if (type->typ_procroot != NULL)
266 remove_proc_subtree(type->typ_name, proc_lustre_root);
267 if (type->typ_procsym != NULL)
268 lprocfs_remove(&type->typ_procsym);
271 lu_device_type_fini(type->typ_lu);
273 spin_lock(&obd_types_lock);
274 list_del(&type->typ_chain);
275 spin_unlock(&obd_types_lock);
276 OBD_FREE(type->typ_name, strlen(name) + 1);
277 if (type->typ_dt_ops != NULL)
278 OBD_FREE_PTR(type->typ_dt_ops);
279 if (type->typ_md_ops != NULL)
280 OBD_FREE_PTR(type->typ_md_ops);
281 OBD_FREE(type, sizeof(*type));
283 } /* class_unregister_type */
284 EXPORT_SYMBOL(class_unregister_type);
287 * Create a new obd device.
289 * Find an empty slot in ::obd_devs[], create a new obd device in it.
291 * \param[in] type_name obd device type string.
292 * \param[in] name obd device name.
294 * \retval NULL if create fails, otherwise return the obd device
297 struct obd_device *class_newdev(const char *type_name, const char *name)
299 struct obd_device *result = NULL;
300 struct obd_device *newdev;
301 struct obd_type *type = NULL;
303 int new_obd_minor = 0;
306 if (strlen(name) >= MAX_OBD_NAME) {
307 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
308 RETURN(ERR_PTR(-EINVAL));
311 type = class_get_type(type_name);
313 CERROR("OBD: unknown type: %s\n", type_name);
314 RETURN(ERR_PTR(-ENODEV));
317 newdev = obd_device_alloc();
319 GOTO(out_type, result = ERR_PTR(-ENOMEM));
321 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
323 write_lock(&obd_dev_lock);
324 for (i = 0; i < class_devno_max(); i++) {
325 struct obd_device *obd = class_num2obd(i);
327 if (obd && (strcmp(name, obd->obd_name) == 0)) {
328 CERROR("Device %s already exists at %d, won't add\n",
331 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
332 "%p obd_magic %08x != %08x\n", result,
333 result->obd_magic, OBD_DEVICE_MAGIC);
334 LASSERTF(result->obd_minor == new_obd_minor,
335 "%p obd_minor %d != %d\n", result,
336 result->obd_minor, new_obd_minor);
338 obd_devs[result->obd_minor] = NULL;
339 result->obd_name[0]='\0';
341 result = ERR_PTR(-EEXIST);
344 if (!result && !obd) {
346 result->obd_minor = i;
348 result->obd_type = type;
349 strncpy(result->obd_name, name,
350 sizeof(result->obd_name) - 1);
351 obd_devs[i] = result;
354 write_unlock(&obd_dev_lock);
356 if (result == NULL && i >= class_devno_max()) {
357 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
359 GOTO(out, result = ERR_PTR(-EOVERFLOW));
365 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
366 result->obd_name, result);
370 obd_device_free(newdev);
372 class_put_type(type);
376 void class_release_dev(struct obd_device *obd)
378 struct obd_type *obd_type = obd->obd_type;
380 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
381 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
382 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
383 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
384 LASSERT(obd_type != NULL);
386 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
387 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
389 write_lock(&obd_dev_lock);
390 obd_devs[obd->obd_minor] = NULL;
391 write_unlock(&obd_dev_lock);
392 obd_device_free(obd);
394 class_put_type(obd_type);
397 int class_name2dev(const char *name)
404 read_lock(&obd_dev_lock);
405 for (i = 0; i < class_devno_max(); i++) {
406 struct obd_device *obd = class_num2obd(i);
408 if (obd && strcmp(name, obd->obd_name) == 0) {
409 /* Make sure we finished attaching before we give
410 out any references */
411 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
412 if (obd->obd_attached) {
413 read_unlock(&obd_dev_lock);
419 read_unlock(&obd_dev_lock);
424 struct obd_device *class_name2obd(const char *name)
426 int dev = class_name2dev(name);
428 if (dev < 0 || dev > class_devno_max())
430 return class_num2obd(dev);
432 EXPORT_SYMBOL(class_name2obd);
434 int class_uuid2dev(struct obd_uuid *uuid)
438 read_lock(&obd_dev_lock);
439 for (i = 0; i < class_devno_max(); i++) {
440 struct obd_device *obd = class_num2obd(i);
442 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
443 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
444 read_unlock(&obd_dev_lock);
448 read_unlock(&obd_dev_lock);
453 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
455 int dev = class_uuid2dev(uuid);
458 return class_num2obd(dev);
460 EXPORT_SYMBOL(class_uuid2obd);
463 * Get obd device from ::obd_devs[]
465 * \param num [in] array index
467 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
468 * otherwise return the obd device there.
470 struct obd_device *class_num2obd(int num)
472 struct obd_device *obd = NULL;
474 if (num < class_devno_max()) {
479 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
480 "%p obd_magic %08x != %08x\n",
481 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
482 LASSERTF(obd->obd_minor == num,
483 "%p obd_minor %0d != %0d\n",
484 obd, obd->obd_minor, num);
491 * Get obd devices count. Device in any
493 * \retval obd device count
495 int get_devices_count(void)
497 int index, max_index = class_devno_max(), dev_count = 0;
499 read_lock(&obd_dev_lock);
500 for (index = 0; index <= max_index; index++) {
501 struct obd_device *obd = class_num2obd(index);
505 read_unlock(&obd_dev_lock);
509 EXPORT_SYMBOL(get_devices_count);
511 void class_obd_list(void)
516 read_lock(&obd_dev_lock);
517 for (i = 0; i < class_devno_max(); i++) {
518 struct obd_device *obd = class_num2obd(i);
522 if (obd->obd_stopping)
524 else if (obd->obd_set_up)
526 else if (obd->obd_attached)
530 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
531 i, status, obd->obd_type->typ_name,
532 obd->obd_name, obd->obd_uuid.uuid,
533 atomic_read(&obd->obd_refcount));
535 read_unlock(&obd_dev_lock);
539 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
540 specified, then only the client with that uuid is returned,
541 otherwise any client connected to the tgt is returned. */
542 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
543 const char * typ_name,
544 struct obd_uuid *grp_uuid)
548 read_lock(&obd_dev_lock);
549 for (i = 0; i < class_devno_max(); i++) {
550 struct obd_device *obd = class_num2obd(i);
554 if ((strncmp(obd->obd_type->typ_name, typ_name,
555 strlen(typ_name)) == 0)) {
556 if (obd_uuid_equals(tgt_uuid,
557 &obd->u.cli.cl_target_uuid) &&
558 ((grp_uuid)? obd_uuid_equals(grp_uuid,
559 &obd->obd_uuid) : 1)) {
560 read_unlock(&obd_dev_lock);
565 read_unlock(&obd_dev_lock);
569 EXPORT_SYMBOL(class_find_client_obd);
571 /* Iterate the obd_device list looking devices have grp_uuid. Start
572 searching at *next, and if a device is found, the next index to look
573 at is saved in *next. If next is NULL, then the first matching device
574 will always be returned. */
575 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
581 else if (*next >= 0 && *next < class_devno_max())
586 read_lock(&obd_dev_lock);
587 for (; i < class_devno_max(); i++) {
588 struct obd_device *obd = class_num2obd(i);
592 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
595 read_unlock(&obd_dev_lock);
599 read_unlock(&obd_dev_lock);
603 EXPORT_SYMBOL(class_devices_in_group);
606 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
607 * adjust sptlrpc settings accordingly.
609 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
611 struct obd_device *obd;
615 LASSERT(namelen > 0);
617 read_lock(&obd_dev_lock);
618 for (i = 0; i < class_devno_max(); i++) {
619 obd = class_num2obd(i);
621 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
624 /* only notify mdc, osc, mdt, ost */
625 type = obd->obd_type->typ_name;
626 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
627 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
628 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
629 strcmp(type, LUSTRE_OST_NAME) != 0)
632 if (strncmp(obd->obd_name, fsname, namelen))
635 class_incref(obd, __FUNCTION__, obd);
636 read_unlock(&obd_dev_lock);
637 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
638 sizeof(KEY_SPTLRPC_CONF),
639 KEY_SPTLRPC_CONF, 0, NULL, NULL);
641 class_decref(obd, __FUNCTION__, obd);
642 read_lock(&obd_dev_lock);
644 read_unlock(&obd_dev_lock);
647 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
649 void obd_cleanup_caches(void)
652 if (obd_device_cachep) {
653 kmem_cache_destroy(obd_device_cachep);
654 obd_device_cachep = NULL;
657 kmem_cache_destroy(obdo_cachep);
661 kmem_cache_destroy(import_cachep);
662 import_cachep = NULL;
665 kmem_cache_destroy(capa_cachep);
671 int obd_init_caches(void)
676 LASSERT(obd_device_cachep == NULL);
677 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
678 sizeof(struct obd_device),
680 if (!obd_device_cachep)
681 GOTO(out, rc = -ENOMEM);
683 LASSERT(obdo_cachep == NULL);
684 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
687 GOTO(out, rc = -ENOMEM);
689 LASSERT(import_cachep == NULL);
690 import_cachep = kmem_cache_create("ll_import_cache",
691 sizeof(struct obd_import),
694 GOTO(out, rc = -ENOMEM);
696 LASSERT(capa_cachep == NULL);
697 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
700 GOTO(out, rc = -ENOMEM);
704 obd_cleanup_caches();
708 /* map connection to client */
709 struct obd_export *class_conn2export(struct lustre_handle *conn)
711 struct obd_export *export;
715 CDEBUG(D_CACHE, "looking for null handle\n");
719 if (conn->cookie == -1) { /* this means assign a new connection */
720 CDEBUG(D_CACHE, "want a new connection\n");
724 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
725 export = class_handle2object(conn->cookie, NULL);
728 EXPORT_SYMBOL(class_conn2export);
730 struct obd_device *class_exp2obd(struct obd_export *exp)
736 EXPORT_SYMBOL(class_exp2obd);
738 struct obd_device *class_conn2obd(struct lustre_handle *conn)
740 struct obd_export *export;
741 export = class_conn2export(conn);
743 struct obd_device *obd = export->exp_obd;
744 class_export_put(export);
750 struct obd_import *class_exp2cliimp(struct obd_export *exp)
752 struct obd_device *obd = exp->exp_obd;
755 return obd->u.cli.cl_import;
757 EXPORT_SYMBOL(class_exp2cliimp);
759 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
761 struct obd_device *obd = class_conn2obd(conn);
764 return obd->u.cli.cl_import;
767 /* Export management functions */
768 static void class_export_destroy(struct obd_export *exp)
770 struct obd_device *obd = exp->exp_obd;
773 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
774 LASSERT(obd != NULL);
776 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
777 exp->exp_client_uuid.uuid, obd->obd_name);
779 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
780 if (exp->exp_connection)
781 ptlrpc_put_connection_superhack(exp->exp_connection);
783 LASSERT(list_empty(&exp->exp_outstanding_replies));
784 LASSERT(list_empty(&exp->exp_uncommitted_replies));
785 LASSERT(list_empty(&exp->exp_req_replay_queue));
786 LASSERT(list_empty(&exp->exp_hp_rpcs));
787 obd_destroy_export(exp);
788 class_decref(obd, "export", exp);
790 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
794 static void export_handle_addref(void *export)
796 class_export_get(export);
799 static struct portals_handle_ops export_handle_ops = {
800 .hop_addref = export_handle_addref,
804 struct obd_export *class_export_get(struct obd_export *exp)
806 atomic_inc(&exp->exp_refcount);
807 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
808 atomic_read(&exp->exp_refcount));
811 EXPORT_SYMBOL(class_export_get);
813 void class_export_put(struct obd_export *exp)
815 LASSERT(exp != NULL);
816 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
817 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
818 atomic_read(&exp->exp_refcount) - 1);
820 if (atomic_dec_and_test(&exp->exp_refcount)) {
821 LASSERT(!list_empty(&exp->exp_obd_chain));
822 CDEBUG(D_IOCTL, "final put %p/%s\n",
823 exp, exp->exp_client_uuid.uuid);
825 /* release nid stat refererence */
826 lprocfs_exp_cleanup(exp);
828 obd_zombie_export_add(exp);
831 EXPORT_SYMBOL(class_export_put);
833 /* Creates a new export, adds it to the hash table, and returns a
834 * pointer to it. The refcount is 2: one for the hash reference, and
835 * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837 struct obd_uuid *cluuid)
839 struct obd_export *export;
840 struct cfs_hash *hash = NULL;
844 OBD_ALLOC_PTR(export);
846 return ERR_PTR(-ENOMEM);
848 export->exp_conn_cnt = 0;
849 export->exp_lock_hash = NULL;
850 export->exp_flock_hash = NULL;
851 atomic_set(&export->exp_refcount, 2);
852 atomic_set(&export->exp_rpc_count, 0);
853 atomic_set(&export->exp_cb_count, 0);
854 atomic_set(&export->exp_locks_count, 0);
855 #if LUSTRE_TRACKS_LOCK_EXP_REFS
856 INIT_LIST_HEAD(&export->exp_locks_list);
857 spin_lock_init(&export->exp_locks_list_guard);
859 atomic_set(&export->exp_replay_count, 0);
860 export->exp_obd = obd;
861 INIT_LIST_HEAD(&export->exp_outstanding_replies);
862 spin_lock_init(&export->exp_uncommitted_replies_lock);
863 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
864 INIT_LIST_HEAD(&export->exp_req_replay_queue);
865 INIT_LIST_HEAD(&export->exp_handle.h_link);
866 INIT_LIST_HEAD(&export->exp_hp_rpcs);
867 INIT_LIST_HEAD(&export->exp_reg_rpcs);
868 class_handle_hash(&export->exp_handle, &export_handle_ops);
869 export->exp_last_request_time = cfs_time_current_sec();
870 spin_lock_init(&export->exp_lock);
871 spin_lock_init(&export->exp_rpc_lock);
872 INIT_HLIST_NODE(&export->exp_uuid_hash);
873 INIT_HLIST_NODE(&export->exp_nid_hash);
874 spin_lock_init(&export->exp_bl_list_lock);
875 INIT_LIST_HEAD(&export->exp_bl_list);
877 export->exp_sp_peer = LUSTRE_SP_ANY;
878 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
879 export->exp_client_uuid = *cluuid;
880 obd_init_export(export);
882 spin_lock(&obd->obd_dev_lock);
883 /* shouldn't happen, but might race */
884 if (obd->obd_stopping)
885 GOTO(exit_unlock, rc = -ENODEV);
887 hash = cfs_hash_getref(obd->obd_uuid_hash);
889 GOTO(exit_unlock, rc = -ENODEV);
890 spin_unlock(&obd->obd_dev_lock);
892 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
893 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
895 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
896 obd->obd_name, cluuid->uuid, rc);
897 GOTO(exit_err, rc = -EALREADY);
901 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
902 spin_lock(&obd->obd_dev_lock);
903 if (obd->obd_stopping) {
904 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
905 GOTO(exit_unlock, rc = -ENODEV);
908 class_incref(obd, "export", export);
909 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
910 list_add_tail(&export->exp_obd_chain_timed,
911 &export->exp_obd->obd_exports_timed);
912 export->exp_obd->obd_num_exports++;
913 spin_unlock(&obd->obd_dev_lock);
914 cfs_hash_putref(hash);
918 spin_unlock(&obd->obd_dev_lock);
921 cfs_hash_putref(hash);
922 class_handle_unhash(&export->exp_handle);
923 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
924 obd_destroy_export(export);
925 OBD_FREE_PTR(export);
928 EXPORT_SYMBOL(class_new_export);
930 void class_unlink_export(struct obd_export *exp)
932 class_handle_unhash(&exp->exp_handle);
934 spin_lock(&exp->exp_obd->obd_dev_lock);
935 /* delete an uuid-export hashitem from hashtables */
936 if (!hlist_unhashed(&exp->exp_uuid_hash))
937 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
938 &exp->exp_client_uuid,
939 &exp->exp_uuid_hash);
941 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
942 list_del_init(&exp->exp_obd_chain_timed);
943 exp->exp_obd->obd_num_exports--;
944 spin_unlock(&exp->exp_obd->obd_dev_lock);
945 class_export_put(exp);
948 /* Import management functions */
949 static void class_import_destroy(struct obd_import *imp)
953 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
954 imp->imp_obd->obd_name);
956 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
958 ptlrpc_put_connection_superhack(imp->imp_connection);
960 while (!list_empty(&imp->imp_conn_list)) {
961 struct obd_import_conn *imp_conn;
963 imp_conn = list_entry(imp->imp_conn_list.next,
964 struct obd_import_conn, oic_item);
965 list_del_init(&imp_conn->oic_item);
966 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
967 OBD_FREE(imp_conn, sizeof(*imp_conn));
970 LASSERT(imp->imp_sec == NULL);
971 class_decref(imp->imp_obd, "import", imp);
972 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
976 static void import_handle_addref(void *import)
978 class_import_get(import);
981 static struct portals_handle_ops import_handle_ops = {
982 .hop_addref = import_handle_addref,
986 struct obd_import *class_import_get(struct obd_import *import)
988 atomic_inc(&import->imp_refcount);
989 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
990 atomic_read(&import->imp_refcount),
991 import->imp_obd->obd_name);
994 EXPORT_SYMBOL(class_import_get);
996 void class_import_put(struct obd_import *imp)
1000 LASSERT(list_empty(&imp->imp_zombie_chain));
1001 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1003 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1004 atomic_read(&imp->imp_refcount) - 1,
1005 imp->imp_obd->obd_name);
1007 if (atomic_dec_and_test(&imp->imp_refcount)) {
1008 CDEBUG(D_INFO, "final put import %p\n", imp);
1009 obd_zombie_import_add(imp);
1012 /* catch possible import put race */
1013 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1016 EXPORT_SYMBOL(class_import_put);
1018 static void init_imp_at(struct imp_at *at) {
1020 at_init(&at->iat_net_latency, 0, 0);
1021 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1022 /* max service estimates are tracked on the server side, so
1023 don't use the AT history here, just use the last reported
1024 val. (But keep hist for proc histogram, worst_ever) */
1025 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1030 struct obd_import *class_new_import(struct obd_device *obd)
1032 struct obd_import *imp;
1034 OBD_ALLOC(imp, sizeof(*imp));
1038 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1039 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1040 INIT_LIST_HEAD(&imp->imp_replay_list);
1041 INIT_LIST_HEAD(&imp->imp_sending_list);
1042 INIT_LIST_HEAD(&imp->imp_delayed_list);
1043 INIT_LIST_HEAD(&imp->imp_committed_list);
1044 imp->imp_replay_cursor = &imp->imp_committed_list;
1045 spin_lock_init(&imp->imp_lock);
1046 imp->imp_last_success_conn = 0;
1047 imp->imp_state = LUSTRE_IMP_NEW;
1048 imp->imp_obd = class_incref(obd, "import", imp);
1049 mutex_init(&imp->imp_sec_mutex);
1050 init_waitqueue_head(&imp->imp_recovery_waitq);
1052 atomic_set(&imp->imp_refcount, 2);
1053 atomic_set(&imp->imp_unregistering, 0);
1054 atomic_set(&imp->imp_inflight, 0);
1055 atomic_set(&imp->imp_replay_inflight, 0);
1056 atomic_set(&imp->imp_inval_count, 0);
1057 INIT_LIST_HEAD(&imp->imp_conn_list);
1058 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1059 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1060 init_imp_at(&imp->imp_at);
1062 /* the default magic is V2, will be used in connect RPC, and
1063 * then adjusted according to the flags in request/reply. */
1064 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1068 EXPORT_SYMBOL(class_new_import);
1070 void class_destroy_import(struct obd_import *import)
1072 LASSERT(import != NULL);
1073 LASSERT(import != LP_POISON);
1075 class_handle_unhash(&import->imp_handle);
1077 spin_lock(&import->imp_lock);
1078 import->imp_generation++;
1079 spin_unlock(&import->imp_lock);
1080 class_import_put(import);
1082 EXPORT_SYMBOL(class_destroy_import);
1084 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1086 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1088 spin_lock(&exp->exp_locks_list_guard);
1090 LASSERT(lock->l_exp_refs_nr >= 0);
1092 if (lock->l_exp_refs_target != NULL &&
1093 lock->l_exp_refs_target != exp) {
1094 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1095 exp, lock, lock->l_exp_refs_target);
1097 if ((lock->l_exp_refs_nr ++) == 0) {
1098 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1099 lock->l_exp_refs_target = exp;
1101 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1102 lock, exp, lock->l_exp_refs_nr);
1103 spin_unlock(&exp->exp_locks_list_guard);
1106 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1108 spin_lock(&exp->exp_locks_list_guard);
1109 LASSERT(lock->l_exp_refs_nr > 0);
1110 if (lock->l_exp_refs_target != exp) {
1111 LCONSOLE_WARN("lock %p, "
1112 "mismatching export pointers: %p, %p\n",
1113 lock, lock->l_exp_refs_target, exp);
1115 if (-- lock->l_exp_refs_nr == 0) {
1116 list_del_init(&lock->l_exp_refs_link);
1117 lock->l_exp_refs_target = NULL;
1119 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1120 lock, exp, lock->l_exp_refs_nr);
1121 spin_unlock(&exp->exp_locks_list_guard);
1125 /* A connection defines an export context in which preallocation can
1126 be managed. This releases the export pointer reference, and returns
1127 the export handle, so the export refcount is 1 when this function
1129 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1130 struct obd_uuid *cluuid)
1132 struct obd_export *export;
1133 LASSERT(conn != NULL);
1134 LASSERT(obd != NULL);
1135 LASSERT(cluuid != NULL);
1138 export = class_new_export(obd, cluuid);
1140 RETURN(PTR_ERR(export));
1142 conn->cookie = export->exp_handle.h_cookie;
1143 class_export_put(export);
1145 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1146 cluuid->uuid, conn->cookie);
1149 EXPORT_SYMBOL(class_connect);
1151 /* if export is involved in recovery then clean up related things */
1152 static void class_export_recovery_cleanup(struct obd_export *exp)
1154 struct obd_device *obd = exp->exp_obd;
1156 spin_lock(&obd->obd_recovery_task_lock);
1157 if (obd->obd_recovering) {
1158 if (exp->exp_in_recovery) {
1159 spin_lock(&exp->exp_lock);
1160 exp->exp_in_recovery = 0;
1161 spin_unlock(&exp->exp_lock);
1162 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1163 atomic_dec(&obd->obd_connected_clients);
1166 /* if called during recovery then should update
1167 * obd_stale_clients counter,
1168 * lightweight exports are not counted */
1169 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1170 exp->exp_obd->obd_stale_clients++;
1172 spin_unlock(&obd->obd_recovery_task_lock);
1174 spin_lock(&exp->exp_lock);
1175 /** Cleanup req replay fields */
1176 if (exp->exp_req_replay_needed) {
1177 exp->exp_req_replay_needed = 0;
1179 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1180 atomic_dec(&obd->obd_req_replay_clients);
1183 /** Cleanup lock replay data */
1184 if (exp->exp_lock_replay_needed) {
1185 exp->exp_lock_replay_needed = 0;
1187 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1188 atomic_dec(&obd->obd_lock_replay_clients);
1190 spin_unlock(&exp->exp_lock);
1193 /* This function removes 1-3 references from the export:
1194 * 1 - for export pointer passed
1195 * and if disconnect really need
1196 * 2 - removing from hash
1197 * 3 - in client_unlink_export
1198 * The export pointer passed to this function can destroyed */
1199 int class_disconnect(struct obd_export *export)
1201 int already_disconnected;
1204 if (export == NULL) {
1205 CWARN("attempting to free NULL export %p\n", export);
1209 spin_lock(&export->exp_lock);
1210 already_disconnected = export->exp_disconnected;
1211 export->exp_disconnected = 1;
1212 spin_unlock(&export->exp_lock);
1214 /* class_cleanup(), abort_recovery(), and class_fail_export()
1215 * all end up in here, and if any of them race we shouldn't
1216 * call extra class_export_puts(). */
1217 if (already_disconnected) {
1218 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1219 GOTO(no_disconn, already_disconnected);
1222 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1223 export->exp_handle.h_cookie);
1225 if (!hlist_unhashed(&export->exp_nid_hash))
1226 cfs_hash_del(export->exp_obd->obd_nid_hash,
1227 &export->exp_connection->c_peer.nid,
1228 &export->exp_nid_hash);
1230 class_export_recovery_cleanup(export);
1231 class_unlink_export(export);
1233 class_export_put(export);
1236 EXPORT_SYMBOL(class_disconnect);
1238 /* Return non-zero for a fully connected export */
1239 int class_connected_export(struct obd_export *exp)
1244 spin_lock(&exp->exp_lock);
1245 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1246 spin_unlock(&exp->exp_lock);
1250 EXPORT_SYMBOL(class_connected_export);
1252 static void class_disconnect_export_list(struct list_head *list,
1253 enum obd_option flags)
1256 struct obd_export *exp;
1259 /* It's possible that an export may disconnect itself, but
1260 * nothing else will be added to this list. */
1261 while (!list_empty(list)) {
1262 exp = list_entry(list->next, struct obd_export,
1264 /* need for safe call CDEBUG after obd_disconnect */
1265 class_export_get(exp);
1267 spin_lock(&exp->exp_lock);
1268 exp->exp_flags = flags;
1269 spin_unlock(&exp->exp_lock);
1271 if (obd_uuid_equals(&exp->exp_client_uuid,
1272 &exp->exp_obd->obd_uuid)) {
1274 "exp %p export uuid == obd uuid, don't discon\n",
1276 /* Need to delete this now so we don't end up pointing
1277 * to work_list later when this export is cleaned up. */
1278 list_del_init(&exp->exp_obd_chain);
1279 class_export_put(exp);
1283 class_export_get(exp);
1284 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1285 "last request at "CFS_TIME_T"\n",
1286 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1287 exp, exp->exp_last_request_time);
1288 /* release one export reference anyway */
1289 rc = obd_disconnect(exp);
1291 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1292 obd_export_nid2str(exp), exp, rc);
1293 class_export_put(exp);
1298 void class_disconnect_exports(struct obd_device *obd)
1300 struct list_head work_list;
1303 /* Move all of the exports from obd_exports to a work list, en masse. */
1304 INIT_LIST_HEAD(&work_list);
1305 spin_lock(&obd->obd_dev_lock);
1306 list_splice_init(&obd->obd_exports, &work_list);
1307 list_splice_init(&obd->obd_delayed_exports, &work_list);
1308 spin_unlock(&obd->obd_dev_lock);
1310 if (!list_empty(&work_list)) {
1311 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1312 "disconnecting them\n", obd->obd_minor, obd);
1313 class_disconnect_export_list(&work_list,
1314 exp_flags_from_obd(obd));
1316 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1317 obd->obd_minor, obd);
1320 EXPORT_SYMBOL(class_disconnect_exports);
1322 /* Remove exports that have not completed recovery.
1324 void class_disconnect_stale_exports(struct obd_device *obd,
1325 int (*test_export)(struct obd_export *))
1327 struct list_head work_list;
1328 struct obd_export *exp, *n;
1332 INIT_LIST_HEAD(&work_list);
1333 spin_lock(&obd->obd_dev_lock);
1334 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1336 /* don't count self-export as client */
1337 if (obd_uuid_equals(&exp->exp_client_uuid,
1338 &exp->exp_obd->obd_uuid))
1341 /* don't evict clients which have no slot in last_rcvd
1342 * (e.g. lightweight connection) */
1343 if (exp->exp_target_data.ted_lr_idx == -1)
1346 spin_lock(&exp->exp_lock);
1347 if (exp->exp_failed || test_export(exp)) {
1348 spin_unlock(&exp->exp_lock);
1351 exp->exp_failed = 1;
1352 spin_unlock(&exp->exp_lock);
1354 list_move(&exp->exp_obd_chain, &work_list);
1356 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1357 obd->obd_name, exp->exp_client_uuid.uuid,
1358 exp->exp_connection == NULL ? "<unknown>" :
1359 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1360 print_export_data(exp, "EVICTING", 0);
1362 spin_unlock(&obd->obd_dev_lock);
1365 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1366 obd->obd_name, evicted);
1368 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1369 OBD_OPT_ABORT_RECOV);
1372 EXPORT_SYMBOL(class_disconnect_stale_exports);
1374 void class_fail_export(struct obd_export *exp)
1376 int rc, already_failed;
1378 spin_lock(&exp->exp_lock);
1379 already_failed = exp->exp_failed;
1380 exp->exp_failed = 1;
1381 spin_unlock(&exp->exp_lock);
1383 if (already_failed) {
1384 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1385 exp, exp->exp_client_uuid.uuid);
1389 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1390 exp, exp->exp_client_uuid.uuid);
1392 if (obd_dump_on_timeout)
1393 libcfs_debug_dumplog();
1395 /* need for safe call CDEBUG after obd_disconnect */
1396 class_export_get(exp);
1398 /* Most callers into obd_disconnect are removing their own reference
1399 * (request, for example) in addition to the one from the hash table.
1400 * We don't have such a reference here, so make one. */
1401 class_export_get(exp);
1402 rc = obd_disconnect(exp);
1404 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1406 CDEBUG(D_HA, "disconnected export %p/%s\n",
1407 exp, exp->exp_client_uuid.uuid);
1408 class_export_put(exp);
1410 EXPORT_SYMBOL(class_fail_export);
1412 char *obd_export_nid2str(struct obd_export *exp)
1414 if (exp->exp_connection != NULL)
1415 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1419 EXPORT_SYMBOL(obd_export_nid2str);
1421 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1423 struct cfs_hash *nid_hash;
1424 struct obd_export *doomed_exp = NULL;
1425 int exports_evicted = 0;
1427 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1429 spin_lock(&obd->obd_dev_lock);
1430 /* umount has run already, so evict thread should leave
1431 * its task to umount thread now */
1432 if (obd->obd_stopping) {
1433 spin_unlock(&obd->obd_dev_lock);
1434 return exports_evicted;
1436 nid_hash = obd->obd_nid_hash;
1437 cfs_hash_getref(nid_hash);
1438 spin_unlock(&obd->obd_dev_lock);
1441 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1442 if (doomed_exp == NULL)
1445 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1446 "nid %s found, wanted nid %s, requested nid %s\n",
1447 obd_export_nid2str(doomed_exp),
1448 libcfs_nid2str(nid_key), nid);
1449 LASSERTF(doomed_exp != obd->obd_self_export,
1450 "self-export is hashed by NID?\n");
1452 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1453 "request\n", obd->obd_name,
1454 obd_uuid2str(&doomed_exp->exp_client_uuid),
1455 obd_export_nid2str(doomed_exp));
1456 class_fail_export(doomed_exp);
1457 class_export_put(doomed_exp);
1460 cfs_hash_putref(nid_hash);
1462 if (!exports_evicted)
1463 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1464 obd->obd_name, nid);
1465 return exports_evicted;
1467 EXPORT_SYMBOL(obd_export_evict_by_nid);
1469 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1471 struct cfs_hash *uuid_hash;
1472 struct obd_export *doomed_exp = NULL;
1473 struct obd_uuid doomed_uuid;
1474 int exports_evicted = 0;
1476 spin_lock(&obd->obd_dev_lock);
1477 if (obd->obd_stopping) {
1478 spin_unlock(&obd->obd_dev_lock);
1479 return exports_evicted;
1481 uuid_hash = obd->obd_uuid_hash;
1482 cfs_hash_getref(uuid_hash);
1483 spin_unlock(&obd->obd_dev_lock);
1485 obd_str2uuid(&doomed_uuid, uuid);
1486 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1487 CERROR("%s: can't evict myself\n", obd->obd_name);
1488 cfs_hash_putref(uuid_hash);
1489 return exports_evicted;
1492 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1494 if (doomed_exp == NULL) {
1495 CERROR("%s: can't disconnect %s: no exports found\n",
1496 obd->obd_name, uuid);
1498 CWARN("%s: evicting %s at adminstrative request\n",
1499 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1500 class_fail_export(doomed_exp);
1501 class_export_put(doomed_exp);
1504 cfs_hash_putref(uuid_hash);
1506 return exports_evicted;
1509 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1510 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1513 static void print_export_data(struct obd_export *exp, const char *status,
1516 struct ptlrpc_reply_state *rs;
1517 struct ptlrpc_reply_state *first_reply = NULL;
1520 spin_lock(&exp->exp_lock);
1521 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1527 spin_unlock(&exp->exp_lock);
1529 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1530 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1531 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1532 atomic_read(&exp->exp_rpc_count),
1533 atomic_read(&exp->exp_cb_count),
1534 atomic_read(&exp->exp_locks_count),
1535 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1536 nreplies, first_reply, nreplies > 3 ? "..." : "",
1537 exp->exp_last_committed);
1538 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1539 if (locks && class_export_dump_hook != NULL)
1540 class_export_dump_hook(exp);
1544 void dump_exports(struct obd_device *obd, int locks)
1546 struct obd_export *exp;
1548 spin_lock(&obd->obd_dev_lock);
1549 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1550 print_export_data(exp, "ACTIVE", locks);
1551 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1552 print_export_data(exp, "UNLINKED", locks);
1553 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1554 print_export_data(exp, "DELAYED", locks);
1555 spin_unlock(&obd->obd_dev_lock);
1556 spin_lock(&obd_zombie_impexp_lock);
1557 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1558 print_export_data(exp, "ZOMBIE", locks);
1559 spin_unlock(&obd_zombie_impexp_lock);
1562 void obd_exports_barrier(struct obd_device *obd)
1565 LASSERT(list_empty(&obd->obd_exports));
1566 spin_lock(&obd->obd_dev_lock);
1567 while (!list_empty(&obd->obd_unlinked_exports)) {
1568 spin_unlock(&obd->obd_dev_lock);
1569 set_current_state(TASK_UNINTERRUPTIBLE);
1570 schedule_timeout(cfs_time_seconds(waited));
1571 if (waited > 5 && IS_PO2(waited)) {
1572 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1573 "more than %d seconds. "
1574 "The obd refcount = %d. Is it stuck?\n",
1575 obd->obd_name, waited,
1576 atomic_read(&obd->obd_refcount));
1577 dump_exports(obd, 1);
1580 spin_lock(&obd->obd_dev_lock);
1582 spin_unlock(&obd->obd_dev_lock);
1584 EXPORT_SYMBOL(obd_exports_barrier);
1586 /* Total amount of zombies to be destroyed */
1587 static int zombies_count = 0;
1590 * kill zombie imports and exports
1592 void obd_zombie_impexp_cull(void)
1594 struct obd_import *import;
1595 struct obd_export *export;
1599 spin_lock(&obd_zombie_impexp_lock);
1602 if (!list_empty(&obd_zombie_imports)) {
1603 import = list_entry(obd_zombie_imports.next,
1606 list_del_init(&import->imp_zombie_chain);
1610 if (!list_empty(&obd_zombie_exports)) {
1611 export = list_entry(obd_zombie_exports.next,
1614 list_del_init(&export->exp_obd_chain);
1617 spin_unlock(&obd_zombie_impexp_lock);
1619 if (import != NULL) {
1620 class_import_destroy(import);
1621 spin_lock(&obd_zombie_impexp_lock);
1623 spin_unlock(&obd_zombie_impexp_lock);
1626 if (export != NULL) {
1627 class_export_destroy(export);
1628 spin_lock(&obd_zombie_impexp_lock);
1630 spin_unlock(&obd_zombie_impexp_lock);
1634 } while (import != NULL || export != NULL);
1638 static struct completion obd_zombie_start;
1639 static struct completion obd_zombie_stop;
1640 static unsigned long obd_zombie_flags;
1641 static wait_queue_head_t obd_zombie_waitq;
1642 static pid_t obd_zombie_pid;
1645 OBD_ZOMBIE_STOP = 0x0001,
1649 * check for work for kill zombie import/export thread.
1651 static int obd_zombie_impexp_check(void *arg)
1655 spin_lock(&obd_zombie_impexp_lock);
1656 rc = (zombies_count == 0) &&
1657 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1658 spin_unlock(&obd_zombie_impexp_lock);
1664 * Add export to the obd_zombe thread and notify it.
1666 static void obd_zombie_export_add(struct obd_export *exp) {
1667 spin_lock(&exp->exp_obd->obd_dev_lock);
1668 LASSERT(!list_empty(&exp->exp_obd_chain));
1669 list_del_init(&exp->exp_obd_chain);
1670 spin_unlock(&exp->exp_obd->obd_dev_lock);
1671 spin_lock(&obd_zombie_impexp_lock);
1673 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1674 spin_unlock(&obd_zombie_impexp_lock);
1676 obd_zombie_impexp_notify();
1680 * Add import to the obd_zombe thread and notify it.
1682 static void obd_zombie_import_add(struct obd_import *imp) {
1683 LASSERT(imp->imp_sec == NULL);
1684 LASSERT(imp->imp_rq_pool == NULL);
1685 spin_lock(&obd_zombie_impexp_lock);
1686 LASSERT(list_empty(&imp->imp_zombie_chain));
1688 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1689 spin_unlock(&obd_zombie_impexp_lock);
1691 obd_zombie_impexp_notify();
1695 * notify import/export destroy thread about new zombie.
1697 static void obd_zombie_impexp_notify(void)
1700 * Make sure obd_zomebie_impexp_thread get this notification.
1701 * It is possible this signal only get by obd_zombie_barrier, and
1702 * barrier gulps this notification and sleeps away and hangs ensues
1704 wake_up_all(&obd_zombie_waitq);
1708 * check whether obd_zombie is idle
1710 static int obd_zombie_is_idle(void)
1714 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1715 spin_lock(&obd_zombie_impexp_lock);
1716 rc = (zombies_count == 0);
1717 spin_unlock(&obd_zombie_impexp_lock);
1722 * wait when obd_zombie import/export queues become empty
1724 void obd_zombie_barrier(void)
1726 struct l_wait_info lwi = { 0 };
1728 if (obd_zombie_pid == current_pid())
1729 /* don't wait for myself */
1731 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1733 EXPORT_SYMBOL(obd_zombie_barrier);
1737 * destroy zombie export/import thread.
1739 static int obd_zombie_impexp_thread(void *unused)
1741 unshare_fs_struct();
1742 complete(&obd_zombie_start);
1744 obd_zombie_pid = current_pid();
1746 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1747 struct l_wait_info lwi = { 0 };
1749 l_wait_event(obd_zombie_waitq,
1750 !obd_zombie_impexp_check(NULL), &lwi);
1751 obd_zombie_impexp_cull();
1754 * Notify obd_zombie_barrier callers that queues
1757 wake_up(&obd_zombie_waitq);
1760 complete(&obd_zombie_stop);
1767 * start destroy zombie import/export thread
1769 int obd_zombie_impexp_init(void)
1771 struct task_struct *task;
1773 INIT_LIST_HEAD(&obd_zombie_imports);
1775 INIT_LIST_HEAD(&obd_zombie_exports);
1776 spin_lock_init(&obd_zombie_impexp_lock);
1777 init_completion(&obd_zombie_start);
1778 init_completion(&obd_zombie_stop);
1779 init_waitqueue_head(&obd_zombie_waitq);
1782 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1784 RETURN(PTR_ERR(task));
1786 wait_for_completion(&obd_zombie_start);
1790 * stop destroy zombie import/export thread
1792 void obd_zombie_impexp_stop(void)
1794 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1795 obd_zombie_impexp_notify();
1796 wait_for_completion(&obd_zombie_stop);
1799 /***** Kernel-userspace comm helpers *******/
1801 /* Get length of entire message, including header */
1802 int kuc_len(int payload_len)
1804 return sizeof(struct kuc_hdr) + payload_len;
1806 EXPORT_SYMBOL(kuc_len);
1808 /* Get a pointer to kuc header, given a ptr to the payload
1809 * @param p Pointer to payload area
1810 * @returns Pointer to kuc header
1812 struct kuc_hdr * kuc_ptr(void *p)
1814 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1815 LASSERT(lh->kuc_magic == KUC_MAGIC);
1818 EXPORT_SYMBOL(kuc_ptr);
1820 /* Test if payload is part of kuc message
1821 * @param p Pointer to payload area
1824 int kuc_ispayload(void *p)
1826 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1828 if (kh->kuc_magic == KUC_MAGIC)
1833 EXPORT_SYMBOL(kuc_ispayload);
1835 /* Alloc space for a message, and fill in header
1836 * @return Pointer to payload area
1838 void *kuc_alloc(int payload_len, int transport, int type)
1841 int len = kuc_len(payload_len);
1845 return ERR_PTR(-ENOMEM);
1847 lh->kuc_magic = KUC_MAGIC;
1848 lh->kuc_transport = transport;
1849 lh->kuc_msgtype = type;
1850 lh->kuc_msglen = len;
1852 return (void *)(lh + 1);
1854 EXPORT_SYMBOL(kuc_alloc);
1856 /* Takes pointer to payload area */
1857 inline void kuc_free(void *p, int payload_len)
1859 struct kuc_hdr *lh = kuc_ptr(p);
1860 OBD_FREE(lh, kuc_len(payload_len));
1862 EXPORT_SYMBOL(kuc_free);
1864 struct obd_request_slot_waiter {
1865 struct list_head orsw_entry;
1866 wait_queue_head_t orsw_waitq;
1870 static bool obd_request_slot_avail(struct client_obd *cli,
1871 struct obd_request_slot_waiter *orsw)
1875 spin_lock(&cli->cl_loi_list_lock);
1876 avail = !!list_empty(&orsw->orsw_entry);
1877 spin_unlock(&cli->cl_loi_list_lock);
1883 * For network flow control, the RPC sponsor needs to acquire a credit
1884 * before sending the RPC. The credits count for a connection is defined
1885 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1886 * the subsequent RPC sponsors need to wait until others released their
1887 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1889 int obd_get_request_slot(struct client_obd *cli)
1891 struct obd_request_slot_waiter orsw;
1892 struct l_wait_info lwi;
1895 spin_lock(&cli->cl_loi_list_lock);
1896 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1897 cli->cl_r_in_flight++;
1898 spin_unlock(&cli->cl_loi_list_lock);
1902 init_waitqueue_head(&orsw.orsw_waitq);
1903 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1904 orsw.orsw_signaled = false;
1905 spin_unlock(&cli->cl_loi_list_lock);
1907 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1908 rc = l_wait_event(orsw.orsw_waitq,
1909 obd_request_slot_avail(cli, &orsw) ||
1913 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1914 * freed but other (such as obd_put_request_slot) is using it. */
1915 spin_lock(&cli->cl_loi_list_lock);
1917 if (!orsw.orsw_signaled) {
1918 if (list_empty(&orsw.orsw_entry))
1919 cli->cl_r_in_flight--;
1921 list_del(&orsw.orsw_entry);
1925 if (orsw.orsw_signaled) {
1926 LASSERT(list_empty(&orsw.orsw_entry));
1930 spin_unlock(&cli->cl_loi_list_lock);
1934 EXPORT_SYMBOL(obd_get_request_slot);
1936 void obd_put_request_slot(struct client_obd *cli)
1938 struct obd_request_slot_waiter *orsw;
1940 spin_lock(&cli->cl_loi_list_lock);
1941 cli->cl_r_in_flight--;
1943 /* If there is free slot, wakeup the first waiter. */
1944 if (!list_empty(&cli->cl_loi_read_list) &&
1945 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1946 orsw = list_entry(cli->cl_loi_read_list.next,
1947 struct obd_request_slot_waiter, orsw_entry);
1948 list_del_init(&orsw->orsw_entry);
1949 cli->cl_r_in_flight++;
1950 wake_up(&orsw->orsw_waitq);
1952 spin_unlock(&cli->cl_loi_list_lock);
1954 EXPORT_SYMBOL(obd_put_request_slot);
1956 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1958 return cli->cl_max_rpcs_in_flight;
1960 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1962 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1964 struct obd_request_slot_waiter *orsw;
1971 if (max > OBD_MAX_RIF_MAX || max < 1)
1974 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
1975 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
1976 /* adjust max_mod_rpcs_in_flight to ensure it is always
1977 * strictly lower that max_rpcs_in_flight */
1979 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
1980 "because it must be higher than "
1981 "max_mod_rpcs_in_flight value",
1982 cli->cl_import->imp_obd->obd_name);
1985 if (max <= cli->cl_max_mod_rpcs_in_flight) {
1986 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
1992 spin_lock(&cli->cl_loi_list_lock);
1993 old = cli->cl_max_rpcs_in_flight;
1994 cli->cl_max_rpcs_in_flight = max;
1997 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1998 for (i = 0; i < diff; i++) {
1999 if (list_empty(&cli->cl_loi_read_list))
2002 orsw = list_entry(cli->cl_loi_read_list.next,
2003 struct obd_request_slot_waiter, orsw_entry);
2004 list_del_init(&orsw->orsw_entry);
2005 cli->cl_r_in_flight++;
2006 wake_up(&orsw->orsw_waitq);
2008 spin_unlock(&cli->cl_loi_list_lock);
2012 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2014 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2016 return cli->cl_max_mod_rpcs_in_flight;
2018 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2020 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2022 struct obd_connect_data *ocd;
2025 if (max > OBD_MAX_RIF_MAX || max < 1)
2028 /* cannot exceed or equal max_rpcs_in_flight */
2029 if (max >= cli->cl_max_rpcs_in_flight) {
2030 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2031 "higher or equal to max_rpcs_in_flight value (%u)\n",
2032 cli->cl_import->imp_obd->obd_name,
2033 max, cli->cl_max_rpcs_in_flight);
2037 /* cannot exceed max modify RPCs in flight supported by the server */
2038 ocd = &cli->cl_import->imp_connect_data;
2039 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2040 maxmodrpcs = ocd->ocd_maxmodrpcs;
2043 if (max > maxmodrpcs) {
2044 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2045 "higher than max_mod_rpcs_per_client value (%hu) "
2046 "returned by the server at connection\n",
2047 cli->cl_import->imp_obd->obd_name,
2052 cli->cl_max_mod_rpcs_in_flight = max;
2054 /* will have to wakeup waiters if max has been increased */
2058 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);