4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_kernelcomm.h>
49 spinlock_t obd_types_lock;
51 static struct kmem_cache *obd_device_cachep;
52 struct kmem_cache *obdo_cachep;
53 EXPORT_SYMBOL(obdo_cachep);
54 static struct kmem_cache *import_cachep;
56 static struct list_head obd_zombie_imports;
57 static struct list_head obd_zombie_exports;
58 static spinlock_t obd_zombie_impexp_lock;
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64 const char *status, int locks);
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
100 struct list_head *tmp;
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 list_for_each(tmp, &obd_types) {
105 type = list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
114 EXPORT_SYMBOL(class_search_type);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
122 const char *modname = name;
124 if (strcmp(modname, "obdfilter") == 0)
127 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128 modname = LUSTRE_OSP_NAME;
130 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131 modname = LUSTRE_MDT_NAME;
133 if (!request_module("%s", modname)) {
134 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135 type = class_search_type(name);
137 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143 spin_lock(&type->obd_type_lock);
145 try_module_get(type->typ_dt_ops->o_owner);
146 spin_unlock(&type->obd_type_lock);
151 void class_put_type(struct obd_type *type)
154 spin_lock(&type->obd_type_lock);
156 module_put(type->typ_dt_ops->o_owner);
157 spin_unlock(&type->obd_type_lock);
160 #define CLASS_MAX_NAME 1024
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163 bool enable_proc, struct lprocfs_vars *vars,
164 const char *name, struct lu_device_type *ldt)
166 struct obd_type *type;
171 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
173 if (class_search_type(name)) {
174 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
179 OBD_ALLOC(type, sizeof(*type));
183 OBD_ALLOC_PTR(type->typ_dt_ops);
184 OBD_ALLOC_PTR(type->typ_md_ops);
185 OBD_ALLOC(type->typ_name, strlen(name) + 1);
187 if (type->typ_dt_ops == NULL ||
188 type->typ_md_ops == NULL ||
189 type->typ_name == NULL)
192 *(type->typ_dt_ops) = *dt_ops;
193 /* md_ops is optional */
195 *(type->typ_md_ops) = *md_ops;
196 strcpy(type->typ_name, name);
197 spin_lock_init(&type->obd_type_lock);
199 #ifdef CONFIG_PROC_FS
201 type->typ_procroot = lprocfs_register(type->typ_name,
204 if (IS_ERR(type->typ_procroot)) {
205 rc = PTR_ERR(type->typ_procroot);
206 type->typ_procroot = NULL;
213 rc = lu_device_type_init(ldt);
218 spin_lock(&obd_types_lock);
219 list_add(&type->typ_chain, &obd_types);
220 spin_unlock(&obd_types_lock);
225 if (type->typ_name != NULL) {
226 #ifdef CONFIG_PROC_FS
227 if (type->typ_procroot != NULL)
228 remove_proc_subtree(type->typ_name, proc_lustre_root);
230 OBD_FREE(type->typ_name, strlen(name) + 1);
232 if (type->typ_md_ops != NULL)
233 OBD_FREE_PTR(type->typ_md_ops);
234 if (type->typ_dt_ops != NULL)
235 OBD_FREE_PTR(type->typ_dt_ops);
236 OBD_FREE(type, sizeof(*type));
239 EXPORT_SYMBOL(class_register_type);
241 int class_unregister_type(const char *name)
243 struct obd_type *type = class_search_type(name);
247 CERROR("unknown obd type\n");
251 if (type->typ_refcnt) {
252 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
253 /* This is a bad situation, let's make the best of it */
254 /* Remove ops, but leave the name for debugging */
255 OBD_FREE_PTR(type->typ_dt_ops);
256 OBD_FREE_PTR(type->typ_md_ops);
260 /* we do not use type->typ_procroot as for compatibility purposes
261 * other modules can share names (i.e. lod can use lov entry). so
262 * we can't reference pointer as it can get invalided when another
263 * module removes the entry */
264 #ifdef CONFIG_PROC_FS
265 if (type->typ_procroot != NULL)
266 remove_proc_subtree(type->typ_name, proc_lustre_root);
267 if (type->typ_procsym != NULL)
268 lprocfs_remove(&type->typ_procsym);
271 lu_device_type_fini(type->typ_lu);
273 spin_lock(&obd_types_lock);
274 list_del(&type->typ_chain);
275 spin_unlock(&obd_types_lock);
276 OBD_FREE(type->typ_name, strlen(name) + 1);
277 if (type->typ_dt_ops != NULL)
278 OBD_FREE_PTR(type->typ_dt_ops);
279 if (type->typ_md_ops != NULL)
280 OBD_FREE_PTR(type->typ_md_ops);
281 OBD_FREE(type, sizeof(*type));
283 } /* class_unregister_type */
284 EXPORT_SYMBOL(class_unregister_type);
287 * Create a new obd device.
289 * Find an empty slot in ::obd_devs[], create a new obd device in it.
291 * \param[in] type_name obd device type string.
292 * \param[in] name obd device name.
294 * \retval NULL if create fails, otherwise return the obd device
297 struct obd_device *class_newdev(const char *type_name, const char *name)
299 struct obd_device *result = NULL;
300 struct obd_device *newdev;
301 struct obd_type *type = NULL;
303 int new_obd_minor = 0;
306 if (strlen(name) >= MAX_OBD_NAME) {
307 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
308 RETURN(ERR_PTR(-EINVAL));
311 type = class_get_type(type_name);
313 CERROR("OBD: unknown type: %s\n", type_name);
314 RETURN(ERR_PTR(-ENODEV));
317 newdev = obd_device_alloc();
319 GOTO(out_type, result = ERR_PTR(-ENOMEM));
321 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
323 write_lock(&obd_dev_lock);
324 for (i = 0; i < class_devno_max(); i++) {
325 struct obd_device *obd = class_num2obd(i);
327 if (obd && (strcmp(name, obd->obd_name) == 0)) {
328 CERROR("Device %s already exists at %d, won't add\n",
331 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
332 "%p obd_magic %08x != %08x\n", result,
333 result->obd_magic, OBD_DEVICE_MAGIC);
334 LASSERTF(result->obd_minor == new_obd_minor,
335 "%p obd_minor %d != %d\n", result,
336 result->obd_minor, new_obd_minor);
338 obd_devs[result->obd_minor] = NULL;
339 result->obd_name[0]='\0';
341 result = ERR_PTR(-EEXIST);
344 if (!result && !obd) {
346 result->obd_minor = i;
348 result->obd_type = type;
349 strncpy(result->obd_name, name,
350 sizeof(result->obd_name) - 1);
351 obd_devs[i] = result;
354 write_unlock(&obd_dev_lock);
356 if (result == NULL && i >= class_devno_max()) {
357 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
359 GOTO(out, result = ERR_PTR(-EOVERFLOW));
365 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
366 result->obd_name, result);
370 obd_device_free(newdev);
372 class_put_type(type);
376 void class_release_dev(struct obd_device *obd)
378 struct obd_type *obd_type = obd->obd_type;
380 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
381 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
382 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
383 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
384 LASSERT(obd_type != NULL);
386 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
387 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
389 write_lock(&obd_dev_lock);
390 obd_devs[obd->obd_minor] = NULL;
391 write_unlock(&obd_dev_lock);
392 obd_device_free(obd);
394 class_put_type(obd_type);
397 int class_name2dev(const char *name)
404 read_lock(&obd_dev_lock);
405 for (i = 0; i < class_devno_max(); i++) {
406 struct obd_device *obd = class_num2obd(i);
408 if (obd && strcmp(name, obd->obd_name) == 0) {
409 /* Make sure we finished attaching before we give
410 out any references */
411 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
412 if (obd->obd_attached) {
413 read_unlock(&obd_dev_lock);
419 read_unlock(&obd_dev_lock);
424 struct obd_device *class_name2obd(const char *name)
426 int dev = class_name2dev(name);
428 if (dev < 0 || dev > class_devno_max())
430 return class_num2obd(dev);
432 EXPORT_SYMBOL(class_name2obd);
434 int class_uuid2dev(struct obd_uuid *uuid)
438 read_lock(&obd_dev_lock);
439 for (i = 0; i < class_devno_max(); i++) {
440 struct obd_device *obd = class_num2obd(i);
442 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
443 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
444 read_unlock(&obd_dev_lock);
448 read_unlock(&obd_dev_lock);
453 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
455 int dev = class_uuid2dev(uuid);
458 return class_num2obd(dev);
460 EXPORT_SYMBOL(class_uuid2obd);
463 * Get obd device from ::obd_devs[]
465 * \param num [in] array index
467 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
468 * otherwise return the obd device there.
470 struct obd_device *class_num2obd(int num)
472 struct obd_device *obd = NULL;
474 if (num < class_devno_max()) {
479 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
480 "%p obd_magic %08x != %08x\n",
481 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
482 LASSERTF(obd->obd_minor == num,
483 "%p obd_minor %0d != %0d\n",
484 obd, obd->obd_minor, num);
491 * Get obd devices count. Device in any
493 * \retval obd device count
495 int get_devices_count(void)
497 int index, max_index = class_devno_max(), dev_count = 0;
499 read_lock(&obd_dev_lock);
500 for (index = 0; index <= max_index; index++) {
501 struct obd_device *obd = class_num2obd(index);
505 read_unlock(&obd_dev_lock);
509 EXPORT_SYMBOL(get_devices_count);
511 void class_obd_list(void)
516 read_lock(&obd_dev_lock);
517 for (i = 0; i < class_devno_max(); i++) {
518 struct obd_device *obd = class_num2obd(i);
522 if (obd->obd_stopping)
524 else if (obd->obd_set_up)
526 else if (obd->obd_attached)
530 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
531 i, status, obd->obd_type->typ_name,
532 obd->obd_name, obd->obd_uuid.uuid,
533 atomic_read(&obd->obd_refcount));
535 read_unlock(&obd_dev_lock);
539 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
540 specified, then only the client with that uuid is returned,
541 otherwise any client connected to the tgt is returned. */
542 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
543 const char * typ_name,
544 struct obd_uuid *grp_uuid)
548 read_lock(&obd_dev_lock);
549 for (i = 0; i < class_devno_max(); i++) {
550 struct obd_device *obd = class_num2obd(i);
554 if ((strncmp(obd->obd_type->typ_name, typ_name,
555 strlen(typ_name)) == 0)) {
556 if (obd_uuid_equals(tgt_uuid,
557 &obd->u.cli.cl_target_uuid) &&
558 ((grp_uuid)? obd_uuid_equals(grp_uuid,
559 &obd->obd_uuid) : 1)) {
560 read_unlock(&obd_dev_lock);
565 read_unlock(&obd_dev_lock);
569 EXPORT_SYMBOL(class_find_client_obd);
571 /* Iterate the obd_device list looking devices have grp_uuid. Start
572 searching at *next, and if a device is found, the next index to look
573 at is saved in *next. If next is NULL, then the first matching device
574 will always be returned. */
575 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
581 else if (*next >= 0 && *next < class_devno_max())
586 read_lock(&obd_dev_lock);
587 for (; i < class_devno_max(); i++) {
588 struct obd_device *obd = class_num2obd(i);
592 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
595 read_unlock(&obd_dev_lock);
599 read_unlock(&obd_dev_lock);
603 EXPORT_SYMBOL(class_devices_in_group);
606 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
607 * adjust sptlrpc settings accordingly.
609 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
611 struct obd_device *obd;
615 LASSERT(namelen > 0);
617 read_lock(&obd_dev_lock);
618 for (i = 0; i < class_devno_max(); i++) {
619 obd = class_num2obd(i);
621 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
624 /* only notify mdc, osc, mdt, ost */
625 type = obd->obd_type->typ_name;
626 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
627 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
628 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
629 strcmp(type, LUSTRE_OST_NAME) != 0)
632 if (strncmp(obd->obd_name, fsname, namelen))
635 class_incref(obd, __FUNCTION__, obd);
636 read_unlock(&obd_dev_lock);
637 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
638 sizeof(KEY_SPTLRPC_CONF),
639 KEY_SPTLRPC_CONF, 0, NULL, NULL);
641 class_decref(obd, __FUNCTION__, obd);
642 read_lock(&obd_dev_lock);
644 read_unlock(&obd_dev_lock);
647 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
649 void obd_cleanup_caches(void)
652 if (obd_device_cachep) {
653 kmem_cache_destroy(obd_device_cachep);
654 obd_device_cachep = NULL;
657 kmem_cache_destroy(obdo_cachep);
661 kmem_cache_destroy(import_cachep);
662 import_cachep = NULL;
668 int obd_init_caches(void)
673 LASSERT(obd_device_cachep == NULL);
674 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
675 sizeof(struct obd_device),
677 if (!obd_device_cachep)
678 GOTO(out, rc = -ENOMEM);
680 LASSERT(obdo_cachep == NULL);
681 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
684 GOTO(out, rc = -ENOMEM);
686 LASSERT(import_cachep == NULL);
687 import_cachep = kmem_cache_create("ll_import_cache",
688 sizeof(struct obd_import),
691 GOTO(out, rc = -ENOMEM);
695 obd_cleanup_caches();
699 /* map connection to client */
700 struct obd_export *class_conn2export(struct lustre_handle *conn)
702 struct obd_export *export;
706 CDEBUG(D_CACHE, "looking for null handle\n");
710 if (conn->cookie == -1) { /* this means assign a new connection */
711 CDEBUG(D_CACHE, "want a new connection\n");
715 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
716 export = class_handle2object(conn->cookie, NULL);
719 EXPORT_SYMBOL(class_conn2export);
721 struct obd_device *class_exp2obd(struct obd_export *exp)
727 EXPORT_SYMBOL(class_exp2obd);
729 struct obd_device *class_conn2obd(struct lustre_handle *conn)
731 struct obd_export *export;
732 export = class_conn2export(conn);
734 struct obd_device *obd = export->exp_obd;
735 class_export_put(export);
741 struct obd_import *class_exp2cliimp(struct obd_export *exp)
743 struct obd_device *obd = exp->exp_obd;
746 return obd->u.cli.cl_import;
748 EXPORT_SYMBOL(class_exp2cliimp);
750 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
752 struct obd_device *obd = class_conn2obd(conn);
755 return obd->u.cli.cl_import;
758 /* Export management functions */
759 static void class_export_destroy(struct obd_export *exp)
761 struct obd_device *obd = exp->exp_obd;
764 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
765 LASSERT(obd != NULL);
767 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
768 exp->exp_client_uuid.uuid, obd->obd_name);
770 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
771 if (exp->exp_connection)
772 ptlrpc_put_connection_superhack(exp->exp_connection);
774 LASSERT(list_empty(&exp->exp_outstanding_replies));
775 LASSERT(list_empty(&exp->exp_uncommitted_replies));
776 LASSERT(list_empty(&exp->exp_req_replay_queue));
777 LASSERT(list_empty(&exp->exp_hp_rpcs));
778 obd_destroy_export(exp);
779 class_decref(obd, "export", exp);
781 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
785 static void export_handle_addref(void *export)
787 class_export_get(export);
790 static struct portals_handle_ops export_handle_ops = {
791 .hop_addref = export_handle_addref,
795 struct obd_export *class_export_get(struct obd_export *exp)
797 atomic_inc(&exp->exp_refcount);
798 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
799 atomic_read(&exp->exp_refcount));
802 EXPORT_SYMBOL(class_export_get);
804 void class_export_put(struct obd_export *exp)
806 LASSERT(exp != NULL);
807 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
808 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
809 atomic_read(&exp->exp_refcount) - 1);
811 if (atomic_dec_and_test(&exp->exp_refcount)) {
812 LASSERT(!list_empty(&exp->exp_obd_chain));
813 CDEBUG(D_IOCTL, "final put %p/%s\n",
814 exp, exp->exp_client_uuid.uuid);
816 /* release nid stat refererence */
817 lprocfs_exp_cleanup(exp);
819 obd_zombie_export_add(exp);
822 EXPORT_SYMBOL(class_export_put);
824 /* Creates a new export, adds it to the hash table, and returns a
825 * pointer to it. The refcount is 2: one for the hash reference, and
826 * one for the pointer returned by this function. */
827 struct obd_export *class_new_export(struct obd_device *obd,
828 struct obd_uuid *cluuid)
830 struct obd_export *export;
831 struct cfs_hash *hash = NULL;
835 OBD_ALLOC_PTR(export);
837 return ERR_PTR(-ENOMEM);
839 export->exp_conn_cnt = 0;
840 export->exp_lock_hash = NULL;
841 export->exp_flock_hash = NULL;
842 atomic_set(&export->exp_refcount, 2);
843 atomic_set(&export->exp_rpc_count, 0);
844 atomic_set(&export->exp_cb_count, 0);
845 atomic_set(&export->exp_locks_count, 0);
846 #if LUSTRE_TRACKS_LOCK_EXP_REFS
847 INIT_LIST_HEAD(&export->exp_locks_list);
848 spin_lock_init(&export->exp_locks_list_guard);
850 atomic_set(&export->exp_replay_count, 0);
851 export->exp_obd = obd;
852 INIT_LIST_HEAD(&export->exp_outstanding_replies);
853 spin_lock_init(&export->exp_uncommitted_replies_lock);
854 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
855 INIT_LIST_HEAD(&export->exp_req_replay_queue);
856 INIT_LIST_HEAD(&export->exp_handle.h_link);
857 INIT_LIST_HEAD(&export->exp_hp_rpcs);
858 INIT_LIST_HEAD(&export->exp_reg_rpcs);
859 class_handle_hash(&export->exp_handle, &export_handle_ops);
860 export->exp_last_request_time = cfs_time_current_sec();
861 spin_lock_init(&export->exp_lock);
862 spin_lock_init(&export->exp_rpc_lock);
863 INIT_HLIST_NODE(&export->exp_uuid_hash);
864 INIT_HLIST_NODE(&export->exp_nid_hash);
865 INIT_HLIST_NODE(&export->exp_gen_hash);
866 spin_lock_init(&export->exp_bl_list_lock);
867 INIT_LIST_HEAD(&export->exp_bl_list);
869 export->exp_sp_peer = LUSTRE_SP_ANY;
870 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
871 export->exp_client_uuid = *cluuid;
872 obd_init_export(export);
874 spin_lock(&obd->obd_dev_lock);
875 /* shouldn't happen, but might race */
876 if (obd->obd_stopping)
877 GOTO(exit_unlock, rc = -ENODEV);
879 hash = cfs_hash_getref(obd->obd_uuid_hash);
881 GOTO(exit_unlock, rc = -ENODEV);
882 spin_unlock(&obd->obd_dev_lock);
884 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
885 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
887 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
888 obd->obd_name, cluuid->uuid, rc);
889 GOTO(exit_err, rc = -EALREADY);
893 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
894 spin_lock(&obd->obd_dev_lock);
895 if (obd->obd_stopping) {
896 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
897 GOTO(exit_unlock, rc = -ENODEV);
900 class_incref(obd, "export", export);
901 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
902 list_add_tail(&export->exp_obd_chain_timed,
903 &export->exp_obd->obd_exports_timed);
904 export->exp_obd->obd_num_exports++;
905 spin_unlock(&obd->obd_dev_lock);
906 cfs_hash_putref(hash);
910 spin_unlock(&obd->obd_dev_lock);
913 cfs_hash_putref(hash);
914 class_handle_unhash(&export->exp_handle);
915 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
916 obd_destroy_export(export);
917 OBD_FREE_PTR(export);
920 EXPORT_SYMBOL(class_new_export);
922 void class_unlink_export(struct obd_export *exp)
924 class_handle_unhash(&exp->exp_handle);
926 spin_lock(&exp->exp_obd->obd_dev_lock);
927 /* delete an uuid-export hashitem from hashtables */
928 if (!hlist_unhashed(&exp->exp_uuid_hash))
929 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
930 &exp->exp_client_uuid,
931 &exp->exp_uuid_hash);
933 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
934 list_del_init(&exp->exp_obd_chain_timed);
935 exp->exp_obd->obd_num_exports--;
936 spin_unlock(&exp->exp_obd->obd_dev_lock);
937 class_export_put(exp);
940 /* Import management functions */
941 static void class_import_destroy(struct obd_import *imp)
945 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
946 imp->imp_obd->obd_name);
948 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
950 ptlrpc_put_connection_superhack(imp->imp_connection);
952 while (!list_empty(&imp->imp_conn_list)) {
953 struct obd_import_conn *imp_conn;
955 imp_conn = list_entry(imp->imp_conn_list.next,
956 struct obd_import_conn, oic_item);
957 list_del_init(&imp_conn->oic_item);
958 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
959 OBD_FREE(imp_conn, sizeof(*imp_conn));
962 LASSERT(imp->imp_sec == NULL);
963 class_decref(imp->imp_obd, "import", imp);
964 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
968 static void import_handle_addref(void *import)
970 class_import_get(import);
973 static struct portals_handle_ops import_handle_ops = {
974 .hop_addref = import_handle_addref,
978 struct obd_import *class_import_get(struct obd_import *import)
980 atomic_inc(&import->imp_refcount);
981 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
982 atomic_read(&import->imp_refcount),
983 import->imp_obd->obd_name);
986 EXPORT_SYMBOL(class_import_get);
988 void class_import_put(struct obd_import *imp)
992 LASSERT(list_empty(&imp->imp_zombie_chain));
993 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
995 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
996 atomic_read(&imp->imp_refcount) - 1,
997 imp->imp_obd->obd_name);
999 if (atomic_dec_and_test(&imp->imp_refcount)) {
1000 CDEBUG(D_INFO, "final put import %p\n", imp);
1001 obd_zombie_import_add(imp);
1004 /* catch possible import put race */
1005 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1008 EXPORT_SYMBOL(class_import_put);
1010 static void init_imp_at(struct imp_at *at) {
1012 at_init(&at->iat_net_latency, 0, 0);
1013 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1014 /* max service estimates are tracked on the server side, so
1015 don't use the AT history here, just use the last reported
1016 val. (But keep hist for proc histogram, worst_ever) */
1017 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1022 struct obd_import *class_new_import(struct obd_device *obd)
1024 struct obd_import *imp;
1026 OBD_ALLOC(imp, sizeof(*imp));
1030 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1031 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1032 INIT_LIST_HEAD(&imp->imp_replay_list);
1033 INIT_LIST_HEAD(&imp->imp_sending_list);
1034 INIT_LIST_HEAD(&imp->imp_delayed_list);
1035 INIT_LIST_HEAD(&imp->imp_committed_list);
1036 imp->imp_replay_cursor = &imp->imp_committed_list;
1037 spin_lock_init(&imp->imp_lock);
1038 imp->imp_last_success_conn = 0;
1039 imp->imp_state = LUSTRE_IMP_NEW;
1040 imp->imp_obd = class_incref(obd, "import", imp);
1041 mutex_init(&imp->imp_sec_mutex);
1042 init_waitqueue_head(&imp->imp_recovery_waitq);
1044 atomic_set(&imp->imp_refcount, 2);
1045 atomic_set(&imp->imp_unregistering, 0);
1046 atomic_set(&imp->imp_inflight, 0);
1047 atomic_set(&imp->imp_replay_inflight, 0);
1048 atomic_set(&imp->imp_inval_count, 0);
1049 INIT_LIST_HEAD(&imp->imp_conn_list);
1050 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1051 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1052 init_imp_at(&imp->imp_at);
1054 /* the default magic is V2, will be used in connect RPC, and
1055 * then adjusted according to the flags in request/reply. */
1056 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1060 EXPORT_SYMBOL(class_new_import);
1062 void class_destroy_import(struct obd_import *import)
1064 LASSERT(import != NULL);
1065 LASSERT(import != LP_POISON);
1067 class_handle_unhash(&import->imp_handle);
1069 spin_lock(&import->imp_lock);
1070 import->imp_generation++;
1071 spin_unlock(&import->imp_lock);
1072 class_import_put(import);
1074 EXPORT_SYMBOL(class_destroy_import);
1076 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1078 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1080 spin_lock(&exp->exp_locks_list_guard);
1082 LASSERT(lock->l_exp_refs_nr >= 0);
1084 if (lock->l_exp_refs_target != NULL &&
1085 lock->l_exp_refs_target != exp) {
1086 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1087 exp, lock, lock->l_exp_refs_target);
1089 if ((lock->l_exp_refs_nr ++) == 0) {
1090 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1091 lock->l_exp_refs_target = exp;
1093 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1094 lock, exp, lock->l_exp_refs_nr);
1095 spin_unlock(&exp->exp_locks_list_guard);
1098 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1100 spin_lock(&exp->exp_locks_list_guard);
1101 LASSERT(lock->l_exp_refs_nr > 0);
1102 if (lock->l_exp_refs_target != exp) {
1103 LCONSOLE_WARN("lock %p, "
1104 "mismatching export pointers: %p, %p\n",
1105 lock, lock->l_exp_refs_target, exp);
1107 if (-- lock->l_exp_refs_nr == 0) {
1108 list_del_init(&lock->l_exp_refs_link);
1109 lock->l_exp_refs_target = NULL;
1111 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1112 lock, exp, lock->l_exp_refs_nr);
1113 spin_unlock(&exp->exp_locks_list_guard);
1117 /* A connection defines an export context in which preallocation can
1118 be managed. This releases the export pointer reference, and returns
1119 the export handle, so the export refcount is 1 when this function
1121 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1122 struct obd_uuid *cluuid)
1124 struct obd_export *export;
1125 LASSERT(conn != NULL);
1126 LASSERT(obd != NULL);
1127 LASSERT(cluuid != NULL);
1130 export = class_new_export(obd, cluuid);
1132 RETURN(PTR_ERR(export));
1134 conn->cookie = export->exp_handle.h_cookie;
1135 class_export_put(export);
1137 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1138 cluuid->uuid, conn->cookie);
1141 EXPORT_SYMBOL(class_connect);
1143 /* if export is involved in recovery then clean up related things */
1144 static void class_export_recovery_cleanup(struct obd_export *exp)
1146 struct obd_device *obd = exp->exp_obd;
1148 spin_lock(&obd->obd_recovery_task_lock);
1149 if (obd->obd_recovering) {
1150 if (exp->exp_in_recovery) {
1151 spin_lock(&exp->exp_lock);
1152 exp->exp_in_recovery = 0;
1153 spin_unlock(&exp->exp_lock);
1154 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1155 atomic_dec(&obd->obd_connected_clients);
1158 /* if called during recovery then should update
1159 * obd_stale_clients counter,
1160 * lightweight exports are not counted */
1161 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1162 exp->exp_obd->obd_stale_clients++;
1164 spin_unlock(&obd->obd_recovery_task_lock);
1166 spin_lock(&exp->exp_lock);
1167 /** Cleanup req replay fields */
1168 if (exp->exp_req_replay_needed) {
1169 exp->exp_req_replay_needed = 0;
1171 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1172 atomic_dec(&obd->obd_req_replay_clients);
1175 /** Cleanup lock replay data */
1176 if (exp->exp_lock_replay_needed) {
1177 exp->exp_lock_replay_needed = 0;
1179 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1180 atomic_dec(&obd->obd_lock_replay_clients);
1182 spin_unlock(&exp->exp_lock);
1185 /* This function removes 1-3 references from the export:
1186 * 1 - for export pointer passed
1187 * and if disconnect really need
1188 * 2 - removing from hash
1189 * 3 - in client_unlink_export
1190 * The export pointer passed to this function can destroyed */
1191 int class_disconnect(struct obd_export *export)
1193 int already_disconnected;
1196 if (export == NULL) {
1197 CWARN("attempting to free NULL export %p\n", export);
1201 spin_lock(&export->exp_lock);
1202 already_disconnected = export->exp_disconnected;
1203 export->exp_disconnected = 1;
1204 spin_unlock(&export->exp_lock);
1206 /* class_cleanup(), abort_recovery(), and class_fail_export()
1207 * all end up in here, and if any of them race we shouldn't
1208 * call extra class_export_puts(). */
1209 if (already_disconnected) {
1210 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1211 GOTO(no_disconn, already_disconnected);
1214 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1215 export->exp_handle.h_cookie);
1217 if (!hlist_unhashed(&export->exp_nid_hash))
1218 cfs_hash_del(export->exp_obd->obd_nid_hash,
1219 &export->exp_connection->c_peer.nid,
1220 &export->exp_nid_hash);
1222 class_export_recovery_cleanup(export);
1223 class_unlink_export(export);
1225 class_export_put(export);
1228 EXPORT_SYMBOL(class_disconnect);
1230 /* Return non-zero for a fully connected export */
1231 int class_connected_export(struct obd_export *exp)
1236 spin_lock(&exp->exp_lock);
1237 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1238 spin_unlock(&exp->exp_lock);
1242 EXPORT_SYMBOL(class_connected_export);
1244 static void class_disconnect_export_list(struct list_head *list,
1245 enum obd_option flags)
1248 struct obd_export *exp;
1251 /* It's possible that an export may disconnect itself, but
1252 * nothing else will be added to this list. */
1253 while (!list_empty(list)) {
1254 exp = list_entry(list->next, struct obd_export,
1256 /* need for safe call CDEBUG after obd_disconnect */
1257 class_export_get(exp);
1259 spin_lock(&exp->exp_lock);
1260 exp->exp_flags = flags;
1261 spin_unlock(&exp->exp_lock);
1263 if (obd_uuid_equals(&exp->exp_client_uuid,
1264 &exp->exp_obd->obd_uuid)) {
1266 "exp %p export uuid == obd uuid, don't discon\n",
1268 /* Need to delete this now so we don't end up pointing
1269 * to work_list later when this export is cleaned up. */
1270 list_del_init(&exp->exp_obd_chain);
1271 class_export_put(exp);
1275 class_export_get(exp);
1276 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1277 "last request at "CFS_TIME_T"\n",
1278 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1279 exp, exp->exp_last_request_time);
1280 /* release one export reference anyway */
1281 rc = obd_disconnect(exp);
1283 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1284 obd_export_nid2str(exp), exp, rc);
1285 class_export_put(exp);
1290 void class_disconnect_exports(struct obd_device *obd)
1292 struct list_head work_list;
1295 /* Move all of the exports from obd_exports to a work list, en masse. */
1296 INIT_LIST_HEAD(&work_list);
1297 spin_lock(&obd->obd_dev_lock);
1298 list_splice_init(&obd->obd_exports, &work_list);
1299 list_splice_init(&obd->obd_delayed_exports, &work_list);
1300 spin_unlock(&obd->obd_dev_lock);
1302 if (!list_empty(&work_list)) {
1303 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1304 "disconnecting them\n", obd->obd_minor, obd);
1305 class_disconnect_export_list(&work_list,
1306 exp_flags_from_obd(obd));
1308 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1309 obd->obd_minor, obd);
1312 EXPORT_SYMBOL(class_disconnect_exports);
1314 /* Remove exports that have not completed recovery.
1316 void class_disconnect_stale_exports(struct obd_device *obd,
1317 int (*test_export)(struct obd_export *))
1319 struct list_head work_list;
1320 struct obd_export *exp, *n;
1324 INIT_LIST_HEAD(&work_list);
1325 spin_lock(&obd->obd_dev_lock);
1326 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1328 /* don't count self-export as client */
1329 if (obd_uuid_equals(&exp->exp_client_uuid,
1330 &exp->exp_obd->obd_uuid))
1333 /* don't evict clients which have no slot in last_rcvd
1334 * (e.g. lightweight connection) */
1335 if (exp->exp_target_data.ted_lr_idx == -1)
1338 spin_lock(&exp->exp_lock);
1339 if (exp->exp_failed || test_export(exp)) {
1340 spin_unlock(&exp->exp_lock);
1343 exp->exp_failed = 1;
1344 spin_unlock(&exp->exp_lock);
1346 list_move(&exp->exp_obd_chain, &work_list);
1348 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1349 obd->obd_name, exp->exp_client_uuid.uuid,
1350 exp->exp_connection == NULL ? "<unknown>" :
1351 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1352 print_export_data(exp, "EVICTING", 0);
1354 spin_unlock(&obd->obd_dev_lock);
1357 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1358 obd->obd_name, evicted);
1360 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1361 OBD_OPT_ABORT_RECOV);
1364 EXPORT_SYMBOL(class_disconnect_stale_exports);
1366 void class_fail_export(struct obd_export *exp)
1368 int rc, already_failed;
1370 spin_lock(&exp->exp_lock);
1371 already_failed = exp->exp_failed;
1372 exp->exp_failed = 1;
1373 spin_unlock(&exp->exp_lock);
1375 if (already_failed) {
1376 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1377 exp, exp->exp_client_uuid.uuid);
1381 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1382 exp, exp->exp_client_uuid.uuid);
1384 if (obd_dump_on_timeout)
1385 libcfs_debug_dumplog();
1387 /* need for safe call CDEBUG after obd_disconnect */
1388 class_export_get(exp);
1390 /* Most callers into obd_disconnect are removing their own reference
1391 * (request, for example) in addition to the one from the hash table.
1392 * We don't have such a reference here, so make one. */
1393 class_export_get(exp);
1394 rc = obd_disconnect(exp);
1396 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1398 CDEBUG(D_HA, "disconnected export %p/%s\n",
1399 exp, exp->exp_client_uuid.uuid);
1400 class_export_put(exp);
1402 EXPORT_SYMBOL(class_fail_export);
1404 char *obd_export_nid2str(struct obd_export *exp)
1406 if (exp->exp_connection != NULL)
1407 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1411 EXPORT_SYMBOL(obd_export_nid2str);
1413 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1415 struct cfs_hash *nid_hash;
1416 struct obd_export *doomed_exp = NULL;
1417 int exports_evicted = 0;
1419 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1421 spin_lock(&obd->obd_dev_lock);
1422 /* umount has run already, so evict thread should leave
1423 * its task to umount thread now */
1424 if (obd->obd_stopping) {
1425 spin_unlock(&obd->obd_dev_lock);
1426 return exports_evicted;
1428 nid_hash = obd->obd_nid_hash;
1429 cfs_hash_getref(nid_hash);
1430 spin_unlock(&obd->obd_dev_lock);
1433 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1434 if (doomed_exp == NULL)
1437 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1438 "nid %s found, wanted nid %s, requested nid %s\n",
1439 obd_export_nid2str(doomed_exp),
1440 libcfs_nid2str(nid_key), nid);
1441 LASSERTF(doomed_exp != obd->obd_self_export,
1442 "self-export is hashed by NID?\n");
1444 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1445 "request\n", obd->obd_name,
1446 obd_uuid2str(&doomed_exp->exp_client_uuid),
1447 obd_export_nid2str(doomed_exp));
1448 class_fail_export(doomed_exp);
1449 class_export_put(doomed_exp);
1452 cfs_hash_putref(nid_hash);
1454 if (!exports_evicted)
1455 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1456 obd->obd_name, nid);
1457 return exports_evicted;
1459 EXPORT_SYMBOL(obd_export_evict_by_nid);
1461 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1463 struct cfs_hash *uuid_hash;
1464 struct obd_export *doomed_exp = NULL;
1465 struct obd_uuid doomed_uuid;
1466 int exports_evicted = 0;
1468 spin_lock(&obd->obd_dev_lock);
1469 if (obd->obd_stopping) {
1470 spin_unlock(&obd->obd_dev_lock);
1471 return exports_evicted;
1473 uuid_hash = obd->obd_uuid_hash;
1474 cfs_hash_getref(uuid_hash);
1475 spin_unlock(&obd->obd_dev_lock);
1477 obd_str2uuid(&doomed_uuid, uuid);
1478 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1479 CERROR("%s: can't evict myself\n", obd->obd_name);
1480 cfs_hash_putref(uuid_hash);
1481 return exports_evicted;
1484 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1486 if (doomed_exp == NULL) {
1487 CERROR("%s: can't disconnect %s: no exports found\n",
1488 obd->obd_name, uuid);
1490 CWARN("%s: evicting %s at adminstrative request\n",
1491 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1492 class_fail_export(doomed_exp);
1493 class_export_put(doomed_exp);
1496 cfs_hash_putref(uuid_hash);
1498 return exports_evicted;
1501 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1502 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1505 static void print_export_data(struct obd_export *exp, const char *status,
1508 struct ptlrpc_reply_state *rs;
1509 struct ptlrpc_reply_state *first_reply = NULL;
1512 spin_lock(&exp->exp_lock);
1513 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1519 spin_unlock(&exp->exp_lock);
1521 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1522 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1523 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1524 atomic_read(&exp->exp_rpc_count),
1525 atomic_read(&exp->exp_cb_count),
1526 atomic_read(&exp->exp_locks_count),
1527 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1528 nreplies, first_reply, nreplies > 3 ? "..." : "",
1529 exp->exp_last_committed);
1530 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1531 if (locks && class_export_dump_hook != NULL)
1532 class_export_dump_hook(exp);
1536 void dump_exports(struct obd_device *obd, int locks)
1538 struct obd_export *exp;
1540 spin_lock(&obd->obd_dev_lock);
1541 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1542 print_export_data(exp, "ACTIVE", locks);
1543 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1544 print_export_data(exp, "UNLINKED", locks);
1545 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1546 print_export_data(exp, "DELAYED", locks);
1547 spin_unlock(&obd->obd_dev_lock);
1548 spin_lock(&obd_zombie_impexp_lock);
1549 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1550 print_export_data(exp, "ZOMBIE", locks);
1551 spin_unlock(&obd_zombie_impexp_lock);
1554 void obd_exports_barrier(struct obd_device *obd)
1557 LASSERT(list_empty(&obd->obd_exports));
1558 spin_lock(&obd->obd_dev_lock);
1559 while (!list_empty(&obd->obd_unlinked_exports)) {
1560 spin_unlock(&obd->obd_dev_lock);
1561 set_current_state(TASK_UNINTERRUPTIBLE);
1562 schedule_timeout(cfs_time_seconds(waited));
1563 if (waited > 5 && IS_PO2(waited)) {
1564 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1565 "more than %d seconds. "
1566 "The obd refcount = %d. Is it stuck?\n",
1567 obd->obd_name, waited,
1568 atomic_read(&obd->obd_refcount));
1569 dump_exports(obd, 1);
1572 spin_lock(&obd->obd_dev_lock);
1574 spin_unlock(&obd->obd_dev_lock);
1576 EXPORT_SYMBOL(obd_exports_barrier);
1578 /* Total amount of zombies to be destroyed */
1579 static int zombies_count = 0;
1582 * kill zombie imports and exports
1584 void obd_zombie_impexp_cull(void)
1586 struct obd_import *import;
1587 struct obd_export *export;
1591 spin_lock(&obd_zombie_impexp_lock);
1594 if (!list_empty(&obd_zombie_imports)) {
1595 import = list_entry(obd_zombie_imports.next,
1598 list_del_init(&import->imp_zombie_chain);
1602 if (!list_empty(&obd_zombie_exports)) {
1603 export = list_entry(obd_zombie_exports.next,
1606 list_del_init(&export->exp_obd_chain);
1609 spin_unlock(&obd_zombie_impexp_lock);
1611 if (import != NULL) {
1612 class_import_destroy(import);
1613 spin_lock(&obd_zombie_impexp_lock);
1615 spin_unlock(&obd_zombie_impexp_lock);
1618 if (export != NULL) {
1619 class_export_destroy(export);
1620 spin_lock(&obd_zombie_impexp_lock);
1622 spin_unlock(&obd_zombie_impexp_lock);
1626 } while (import != NULL || export != NULL);
1630 static struct completion obd_zombie_start;
1631 static struct completion obd_zombie_stop;
1632 static unsigned long obd_zombie_flags;
1633 static wait_queue_head_t obd_zombie_waitq;
1634 static pid_t obd_zombie_pid;
1637 OBD_ZOMBIE_STOP = 0x0001,
1641 * check for work for kill zombie import/export thread.
1643 static int obd_zombie_impexp_check(void *arg)
1647 spin_lock(&obd_zombie_impexp_lock);
1648 rc = (zombies_count == 0) &&
1649 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1650 spin_unlock(&obd_zombie_impexp_lock);
1656 * Add export to the obd_zombe thread and notify it.
1658 static void obd_zombie_export_add(struct obd_export *exp) {
1659 spin_lock(&exp->exp_obd->obd_dev_lock);
1660 LASSERT(!list_empty(&exp->exp_obd_chain));
1661 list_del_init(&exp->exp_obd_chain);
1662 spin_unlock(&exp->exp_obd->obd_dev_lock);
1663 spin_lock(&obd_zombie_impexp_lock);
1665 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1666 spin_unlock(&obd_zombie_impexp_lock);
1668 obd_zombie_impexp_notify();
1672 * Add import to the obd_zombe thread and notify it.
1674 static void obd_zombie_import_add(struct obd_import *imp) {
1675 LASSERT(imp->imp_sec == NULL);
1676 spin_lock(&obd_zombie_impexp_lock);
1677 LASSERT(list_empty(&imp->imp_zombie_chain));
1679 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1680 spin_unlock(&obd_zombie_impexp_lock);
1682 obd_zombie_impexp_notify();
1686 * notify import/export destroy thread about new zombie.
1688 static void obd_zombie_impexp_notify(void)
1691 * Make sure obd_zomebie_impexp_thread get this notification.
1692 * It is possible this signal only get by obd_zombie_barrier, and
1693 * barrier gulps this notification and sleeps away and hangs ensues
1695 wake_up_all(&obd_zombie_waitq);
1699 * check whether obd_zombie is idle
1701 static int obd_zombie_is_idle(void)
1705 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1706 spin_lock(&obd_zombie_impexp_lock);
1707 rc = (zombies_count == 0);
1708 spin_unlock(&obd_zombie_impexp_lock);
1713 * wait when obd_zombie import/export queues become empty
1715 void obd_zombie_barrier(void)
1717 struct l_wait_info lwi = { 0 };
1719 if (obd_zombie_pid == current_pid())
1720 /* don't wait for myself */
1722 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1724 EXPORT_SYMBOL(obd_zombie_barrier);
1728 * destroy zombie export/import thread.
1730 static int obd_zombie_impexp_thread(void *unused)
1732 unshare_fs_struct();
1733 complete(&obd_zombie_start);
1735 obd_zombie_pid = current_pid();
1737 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1738 struct l_wait_info lwi = { 0 };
1740 l_wait_event(obd_zombie_waitq,
1741 !obd_zombie_impexp_check(NULL), &lwi);
1742 obd_zombie_impexp_cull();
1745 * Notify obd_zombie_barrier callers that queues
1748 wake_up(&obd_zombie_waitq);
1751 complete(&obd_zombie_stop);
1758 * start destroy zombie import/export thread
1760 int obd_zombie_impexp_init(void)
1762 struct task_struct *task;
1764 INIT_LIST_HEAD(&obd_zombie_imports);
1766 INIT_LIST_HEAD(&obd_zombie_exports);
1767 spin_lock_init(&obd_zombie_impexp_lock);
1768 init_completion(&obd_zombie_start);
1769 init_completion(&obd_zombie_stop);
1770 init_waitqueue_head(&obd_zombie_waitq);
1773 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1775 RETURN(PTR_ERR(task));
1777 wait_for_completion(&obd_zombie_start);
1781 * stop destroy zombie import/export thread
1783 void obd_zombie_impexp_stop(void)
1785 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1786 obd_zombie_impexp_notify();
1787 wait_for_completion(&obd_zombie_stop);
1790 /***** Kernel-userspace comm helpers *******/
1792 /* Get length of entire message, including header */
1793 int kuc_len(int payload_len)
1795 return sizeof(struct kuc_hdr) + payload_len;
1797 EXPORT_SYMBOL(kuc_len);
1799 /* Get a pointer to kuc header, given a ptr to the payload
1800 * @param p Pointer to payload area
1801 * @returns Pointer to kuc header
1803 struct kuc_hdr * kuc_ptr(void *p)
1805 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1806 LASSERT(lh->kuc_magic == KUC_MAGIC);
1809 EXPORT_SYMBOL(kuc_ptr);
1811 /* Test if payload is part of kuc message
1812 * @param p Pointer to payload area
1815 int kuc_ispayload(void *p)
1817 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1819 if (kh->kuc_magic == KUC_MAGIC)
1824 EXPORT_SYMBOL(kuc_ispayload);
1826 /* Alloc space for a message, and fill in header
1827 * @return Pointer to payload area
1829 void *kuc_alloc(int payload_len, int transport, int type)
1832 int len = kuc_len(payload_len);
1836 return ERR_PTR(-ENOMEM);
1838 lh->kuc_magic = KUC_MAGIC;
1839 lh->kuc_transport = transport;
1840 lh->kuc_msgtype = type;
1841 lh->kuc_msglen = len;
1843 return (void *)(lh + 1);
1845 EXPORT_SYMBOL(kuc_alloc);
1847 /* Takes pointer to payload area */
1848 inline void kuc_free(void *p, int payload_len)
1850 struct kuc_hdr *lh = kuc_ptr(p);
1851 OBD_FREE(lh, kuc_len(payload_len));
1853 EXPORT_SYMBOL(kuc_free);
1855 struct obd_request_slot_waiter {
1856 struct list_head orsw_entry;
1857 wait_queue_head_t orsw_waitq;
1861 static bool obd_request_slot_avail(struct client_obd *cli,
1862 struct obd_request_slot_waiter *orsw)
1866 spin_lock(&cli->cl_loi_list_lock);
1867 avail = !!list_empty(&orsw->orsw_entry);
1868 spin_unlock(&cli->cl_loi_list_lock);
1874 * For network flow control, the RPC sponsor needs to acquire a credit
1875 * before sending the RPC. The credits count for a connection is defined
1876 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1877 * the subsequent RPC sponsors need to wait until others released their
1878 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1880 int obd_get_request_slot(struct client_obd *cli)
1882 struct obd_request_slot_waiter orsw;
1883 struct l_wait_info lwi;
1886 spin_lock(&cli->cl_loi_list_lock);
1887 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1888 cli->cl_r_in_flight++;
1889 spin_unlock(&cli->cl_loi_list_lock);
1893 init_waitqueue_head(&orsw.orsw_waitq);
1894 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1895 orsw.orsw_signaled = false;
1896 spin_unlock(&cli->cl_loi_list_lock);
1898 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1899 rc = l_wait_event(orsw.orsw_waitq,
1900 obd_request_slot_avail(cli, &orsw) ||
1904 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1905 * freed but other (such as obd_put_request_slot) is using it. */
1906 spin_lock(&cli->cl_loi_list_lock);
1908 if (!orsw.orsw_signaled) {
1909 if (list_empty(&orsw.orsw_entry))
1910 cli->cl_r_in_flight--;
1912 list_del(&orsw.orsw_entry);
1916 if (orsw.orsw_signaled) {
1917 LASSERT(list_empty(&orsw.orsw_entry));
1921 spin_unlock(&cli->cl_loi_list_lock);
1925 EXPORT_SYMBOL(obd_get_request_slot);
1927 void obd_put_request_slot(struct client_obd *cli)
1929 struct obd_request_slot_waiter *orsw;
1931 spin_lock(&cli->cl_loi_list_lock);
1932 cli->cl_r_in_flight--;
1934 /* If there is free slot, wakeup the first waiter. */
1935 if (!list_empty(&cli->cl_loi_read_list) &&
1936 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1937 orsw = list_entry(cli->cl_loi_read_list.next,
1938 struct obd_request_slot_waiter, orsw_entry);
1939 list_del_init(&orsw->orsw_entry);
1940 cli->cl_r_in_flight++;
1941 wake_up(&orsw->orsw_waitq);
1943 spin_unlock(&cli->cl_loi_list_lock);
1945 EXPORT_SYMBOL(obd_put_request_slot);
1947 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1949 return cli->cl_max_rpcs_in_flight;
1951 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1953 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1955 struct obd_request_slot_waiter *orsw;
1962 if (max > OBD_MAX_RIF_MAX || max < 1)
1965 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
1966 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
1967 /* adjust max_mod_rpcs_in_flight to ensure it is always
1968 * strictly lower that max_rpcs_in_flight */
1970 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
1971 "because it must be higher than "
1972 "max_mod_rpcs_in_flight value",
1973 cli->cl_import->imp_obd->obd_name);
1976 if (max <= cli->cl_max_mod_rpcs_in_flight) {
1977 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
1983 spin_lock(&cli->cl_loi_list_lock);
1984 old = cli->cl_max_rpcs_in_flight;
1985 cli->cl_max_rpcs_in_flight = max;
1988 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1989 for (i = 0; i < diff; i++) {
1990 if (list_empty(&cli->cl_loi_read_list))
1993 orsw = list_entry(cli->cl_loi_read_list.next,
1994 struct obd_request_slot_waiter, orsw_entry);
1995 list_del_init(&orsw->orsw_entry);
1996 cli->cl_r_in_flight++;
1997 wake_up(&orsw->orsw_waitq);
1999 spin_unlock(&cli->cl_loi_list_lock);
2003 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2005 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2007 return cli->cl_max_mod_rpcs_in_flight;
2009 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2011 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2013 struct obd_connect_data *ocd;
2017 if (max > OBD_MAX_RIF_MAX || max < 1)
2020 /* cannot exceed or equal max_rpcs_in_flight */
2021 if (max >= cli->cl_max_rpcs_in_flight) {
2022 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2023 "higher or equal to max_rpcs_in_flight value (%u)\n",
2024 cli->cl_import->imp_obd->obd_name,
2025 max, cli->cl_max_rpcs_in_flight);
2029 /* cannot exceed max modify RPCs in flight supported by the server */
2030 ocd = &cli->cl_import->imp_connect_data;
2031 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2032 maxmodrpcs = ocd->ocd_maxmodrpcs;
2035 if (max > maxmodrpcs) {
2036 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2037 "higher than max_mod_rpcs_per_client value (%hu) "
2038 "returned by the server at connection\n",
2039 cli->cl_import->imp_obd->obd_name,
2044 spin_lock(&cli->cl_mod_rpcs_lock);
2046 prev = cli->cl_max_mod_rpcs_in_flight;
2047 cli->cl_max_mod_rpcs_in_flight = max;
2049 /* wakeup waiters if limit has been increased */
2050 if (cli->cl_max_mod_rpcs_in_flight > prev)
2051 wake_up(&cli->cl_mod_rpcs_waitq);
2053 spin_unlock(&cli->cl_mod_rpcs_lock);
2057 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2060 #define pct(a, b) (b ? a * 100 / b : 0)
2061 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2062 struct seq_file *seq)
2065 unsigned long mod_tot = 0, mod_cum;
2068 do_gettimeofday(&now);
2070 spin_lock(&cli->cl_mod_rpcs_lock);
2072 seq_printf(seq, "snapshot_time: %lu.%lu (secs.usecs)\n",
2073 now.tv_sec, now.tv_usec);
2074 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2075 cli->cl_mod_rpcs_in_flight);
2077 seq_printf(seq, "\n\t\t\tmodify\n");
2078 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2080 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2083 for (i = 0; i < OBD_HIST_MAX; i++) {
2084 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2086 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2087 i, mod, pct(mod, mod_tot),
2088 pct(mod_cum, mod_tot));
2089 if (mod_cum == mod_tot)
2093 spin_unlock(&cli->cl_mod_rpcs_lock);
2097 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2101 /* The number of modify RPCs sent in parallel is limited
2102 * because the server has a finite number of slots per client to
2103 * store request result and ensure reply reconstruction when needed.
2104 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2105 * that takes into account server limit and cl_max_rpcs_in_flight
2107 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2108 * one close request is allowed above the maximum.
2110 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2115 /* A slot is available if
2116 * - number of modify RPCs in flight is less than the max
2117 * - it's a close RPC and no other close request is in flight
2119 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2120 (close_req && cli->cl_close_rpcs_in_flight == 0);
2125 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2130 spin_lock(&cli->cl_mod_rpcs_lock);
2131 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2132 spin_unlock(&cli->cl_mod_rpcs_lock);
2136 /* Get a modify RPC slot from the obd client @cli according
2137 * to the kind of operation @opc that is going to be sent
2138 * and the intent @it of the operation if it applies.
2139 * If the maximum number of modify RPCs in flight is reached
2140 * the thread is put to sleep.
2141 * Returns the tag to be set in the request message. Tag 0
2142 * is reserved for non-modifying requests.
2144 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2145 struct lookup_intent *it)
2147 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2148 bool close_req = false;
2151 /* read-only metadata RPCs don't consume a slot on MDT
2152 * for reply reconstruction
2154 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2155 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2158 if (opc == MDS_CLOSE)
2162 spin_lock(&cli->cl_mod_rpcs_lock);
2163 max = cli->cl_max_mod_rpcs_in_flight;
2164 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2165 /* there is a slot available */
2166 cli->cl_mod_rpcs_in_flight++;
2168 cli->cl_close_rpcs_in_flight++;
2169 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2170 cli->cl_mod_rpcs_in_flight);
2171 /* find a free tag */
2172 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2174 LASSERT(i < OBD_MAX_RIF_MAX);
2175 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2176 spin_unlock(&cli->cl_mod_rpcs_lock);
2177 /* tag 0 is reserved for non-modify RPCs */
2180 spin_unlock(&cli->cl_mod_rpcs_lock);
2182 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2183 "opc %u, max %hu\n",
2184 cli->cl_import->imp_obd->obd_name, opc, max);
2186 l_wait_event(cli->cl_mod_rpcs_waitq,
2187 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2190 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2192 /* Put a modify RPC slot from the obd client @cli according
2193 * to the kind of operation @opc that has been sent and the
2194 * intent @it of the operation if it applies.
2196 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2197 struct lookup_intent *it, __u16 tag)
2199 bool close_req = false;
2201 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2202 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2205 if (opc == MDS_CLOSE)
2208 spin_lock(&cli->cl_mod_rpcs_lock);
2209 cli->cl_mod_rpcs_in_flight--;
2211 cli->cl_close_rpcs_in_flight--;
2212 /* release the tag in the bitmap */
2213 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2214 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2215 spin_unlock(&cli->cl_mod_rpcs_lock);
2216 wake_up(&cli->cl_mod_rpcs_waitq);
2218 EXPORT_SYMBOL(obd_put_mod_rpc_slot);