1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50 #include <lustre_export.h>
52 extern struct list_head obd_types;
53 spinlock_t obd_types_lock;
55 cfs_mem_cache_t *obd_device_cachep;
56 cfs_mem_cache_t *obdo_cachep;
57 EXPORT_SYMBOL(obdo_cachep);
58 cfs_mem_cache_t *import_cachep;
60 struct list_head obd_zombie_imports;
61 struct list_head obd_zombie_exports;
62 spinlock_t obd_zombie_impexp_lock;
63 static void obd_zombie_impexp_notify(void);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 * support functions: we could use inter-module communication, but this
69 * is more portable to other OS's
71 static struct obd_device *obd_device_alloc(void)
73 struct obd_device *obd;
75 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
77 obd->obd_magic = OBD_DEVICE_MAGIC;
81 EXPORT_SYMBOL(obd_device_alloc);
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic "
87 "%08x != %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up "
91 obd, obd->obd_namespace, obd->obd_force);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 EXPORT_SYMBOL(obd_device_free);
98 struct obd_type *class_search_type(const char *name)
100 struct list_head *tmp;
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 list_for_each(tmp, &obd_types) {
105 type = list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
115 struct obd_type *class_get_type(const char *name)
117 struct obd_type *type = class_search_type(name);
121 const char *modname = name;
122 if (strcmp(modname, LUSTRE_MDT_NAME) == 0)
123 modname = LUSTRE_MDS_NAME;
124 if (!request_module(modname)) {
125 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126 type = class_search_type(name);
128 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134 spin_lock(&type->obd_type_lock);
136 try_module_get(type->typ_ops->o_owner);
137 spin_unlock(&type->obd_type_lock);
142 void class_put_type(struct obd_type *type)
145 spin_lock(&type->obd_type_lock);
147 module_put(type->typ_ops->o_owner);
148 spin_unlock(&type->obd_type_lock);
151 int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars,
154 struct obd_type *type;
158 LASSERT(strnlen(name, 1024) < 1024); /* sanity check */
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC(type->typ_ops, sizeof(*type->typ_ops));
171 OBD_ALLOC(type->typ_name, strlen(name) + 1);
172 if (type->typ_ops == NULL || type->typ_name == NULL)
175 *(type->typ_ops) = *ops;
176 strcpy(type->typ_name, name);
177 spin_lock_init(&type->obd_type_lock);
180 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
182 if (IS_ERR(type->typ_procroot)) {
183 rc = PTR_ERR(type->typ_procroot);
184 type->typ_procroot = NULL;
189 spin_lock(&obd_types_lock);
190 list_add(&type->typ_chain, &obd_types);
191 spin_unlock(&obd_types_lock);
196 if (type->typ_name != NULL)
197 OBD_FREE(type->typ_name, strlen(name) + 1);
198 if (type->typ_ops != NULL)
199 OBD_FREE (type->typ_ops, sizeof (*type->typ_ops));
200 OBD_FREE(type, sizeof(*type));
204 int class_unregister_type(const char *name)
206 struct obd_type *type = class_search_type(name);
210 CERROR("unknown obd type\n");
214 if (type->typ_refcnt) {
215 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
216 /* This is a bad situation, let's make the best of it */
217 /* Remove ops, but leave the name for debugging */
218 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
222 if (type->typ_procroot)
223 lprocfs_remove(&type->typ_procroot);
225 spin_lock(&obd_types_lock);
226 list_del(&type->typ_chain);
227 spin_unlock(&obd_types_lock);
228 OBD_FREE(type->typ_name, strlen(name) + 1);
229 if (type->typ_ops != NULL)
230 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
231 OBD_FREE(type, sizeof(*type));
233 } /* class_unregister_type */
236 * Create a new obd device.
238 * Find an empty slot in ::obd_devs[], create a new obd device in it.
240 * \param typename [in] obd device type string.
241 * \param name [in] obd device name.
243 * \retval NULL if create fails, otherwise return the obd device
246 struct obd_device *class_newdev(const char *type_name, const char *name)
248 struct obd_device *result = NULL;
249 struct obd_device *newdev;
250 struct obd_type *type = NULL;
252 int new_obd_minor = 0;
254 if (strlen(name) >= MAX_OBD_NAME) {
255 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
256 RETURN(ERR_PTR(-EINVAL));
259 type = class_get_type(type_name);
261 CERROR("OBD: unknown type: %s\n", type_name);
262 RETURN(ERR_PTR(-ENODEV));
265 newdev = obd_device_alloc();
266 if (newdev == NULL) {
267 class_put_type(type);
268 RETURN(ERR_PTR(-ENOMEM));
270 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
272 spin_lock(&obd_dev_lock);
273 for (i = 0; i < class_devno_max(); i++) {
274 struct obd_device *obd = class_num2obd(i);
275 if (obd && obd->obd_name && (strcmp(name, obd->obd_name) == 0)){
276 CERROR("Device %s already exists, won't add\n", name);
278 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
279 "%p obd_magic %08x != %08x\n", result,
280 result->obd_magic, OBD_DEVICE_MAGIC);
281 LASSERTF(result->obd_minor == new_obd_minor,
282 "%p obd_minor %d != %d\n", result,
283 result->obd_minor, new_obd_minor);
285 obd_devs[result->obd_minor] = NULL;
286 result->obd_name[0]='\0';
288 result = ERR_PTR(-EEXIST);
291 if (!result && !obd) {
293 result->obd_minor = i;
295 result->obd_type = type;
296 strncpy(result->obd_name, name,
297 sizeof(result->obd_name) - 1);
298 obd_devs[i] = result;
301 spin_unlock(&obd_dev_lock);
303 if (result == NULL && i >= class_devno_max()) {
304 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
306 result = ERR_PTR(-EOVERFLOW);
309 if (IS_ERR(result)) {
310 obd_device_free(newdev);
311 class_put_type(type);
313 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
314 result->obd_name, result);
319 void class_release_dev(struct obd_device *obd)
321 struct obd_type *obd_type = obd->obd_type;
323 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
324 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
325 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
326 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
327 LASSERT(obd_type != NULL);
329 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
330 obd->obd_name,obd->obd_type->typ_name);
332 spin_lock(&obd_dev_lock);
333 obd_devs[obd->obd_minor] = NULL;
334 spin_unlock(&obd_dev_lock);
335 obd_device_free(obd);
337 class_put_type(obd_type);
340 int class_name2dev(const char *name)
347 spin_lock(&obd_dev_lock);
348 for (i = 0; i < class_devno_max(); i++) {
349 struct obd_device *obd = class_num2obd(i);
350 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
351 /* Make sure we finished attaching before we give
352 out any references */
353 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
354 if (obd->obd_attached) {
355 spin_unlock(&obd_dev_lock);
361 spin_unlock(&obd_dev_lock);
366 struct obd_device *class_name2obd(const char *name)
368 int dev = class_name2dev(name);
370 if (dev < 0 || dev > class_devno_max())
372 return class_num2obd(dev);
375 int class_uuid2dev(struct obd_uuid *uuid)
379 spin_lock(&obd_dev_lock);
380 for (i = 0; i < class_devno_max(); i++) {
381 struct obd_device *obd = class_num2obd(i);
382 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
383 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
384 spin_unlock(&obd_dev_lock);
388 spin_unlock(&obd_dev_lock);
393 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
395 int dev = class_uuid2dev(uuid);
398 return class_num2obd(dev);
402 * Get obd device from ::obd_devs[]
404 * \param num [in] array index
406 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
407 * otherwise return the obd device there.
409 struct obd_device *class_num2obd(int num)
411 struct obd_device *obd = NULL;
413 if (num < class_devno_max()) {
418 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
419 "%p obd_magic %08x != %08x\n",
420 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
421 LASSERTF(obd->obd_minor == num,
422 "%p obd_minor %0d != %0d\n",
423 obd, obd->obd_minor, num);
429 void class_obd_list(void)
434 spin_lock(&obd_dev_lock);
435 for (i = 0; i < class_devno_max(); i++) {
436 struct obd_device *obd = class_num2obd(i);
439 if (obd->obd_stopping)
441 else if (obd->obd_set_up)
443 else if (obd->obd_attached)
447 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
448 i, status, obd->obd_type->typ_name,
449 obd->obd_name, obd->obd_uuid.uuid,
450 atomic_read(&obd->obd_refcount));
452 spin_unlock(&obd_dev_lock);
456 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
457 specified, then only the client with that uuid is returned,
458 otherwise any client connected to the tgt is returned. */
459 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
460 const char * typ_name,
461 struct obd_uuid *grp_uuid)
465 spin_lock(&obd_dev_lock);
466 for (i = 0; i < class_devno_max(); i++) {
467 struct obd_device *obd = class_num2obd(i);
470 if ((strncmp(obd->obd_type->typ_name, typ_name,
471 strlen(typ_name)) == 0)) {
472 if (obd_uuid_equals(tgt_uuid,
473 &obd->u.cli.cl_target_uuid) &&
474 ((grp_uuid)? obd_uuid_equals(grp_uuid,
475 &obd->obd_uuid) : 1)) {
476 spin_unlock(&obd_dev_lock);
481 spin_unlock(&obd_dev_lock);
486 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
487 struct obd_uuid *grp_uuid)
489 struct obd_device *obd;
491 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
493 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
498 /* Iterate the obd_device list looking devices have grp_uuid. Start
499 searching at *next, and if a device is found, the next index to look
500 at is saved in *next. If next is NULL, then the first matching device
501 will always be returned. */
502 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
508 else if (*next >= 0 && *next < class_devno_max())
513 spin_lock(&obd_dev_lock);
514 for (; i < class_devno_max(); i++) {
515 struct obd_device *obd = class_num2obd(i);
518 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
521 spin_unlock(&obd_dev_lock);
525 spin_unlock(&obd_dev_lock);
531 void obd_cleanup_caches(void)
536 if (obd_device_cachep) {
537 rc = cfs_mem_cache_destroy(obd_device_cachep);
538 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
539 obd_device_cachep = NULL;
542 rc = cfs_mem_cache_destroy(obdo_cachep);
543 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
547 rc = cfs_mem_cache_destroy(import_cachep);
548 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
549 import_cachep = NULL;
554 int obd_init_caches(void)
558 LASSERT(obd_device_cachep == NULL);
559 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
560 sizeof(struct obd_device),
562 if (!obd_device_cachep)
565 LASSERT(obdo_cachep == NULL);
566 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
571 LASSERT(import_cachep == NULL);
572 import_cachep = cfs_mem_cache_create("ll_import_cache",
573 sizeof(struct obd_import),
580 obd_cleanup_caches();
585 /* map connection to client */
586 struct obd_export *class_conn2export(struct lustre_handle *conn)
588 struct obd_export *export;
592 CDEBUG(D_CACHE, "looking for null handle\n");
596 if (conn->cookie == -1) { /* this means assign a new connection */
597 CDEBUG(D_CACHE, "want a new connection\n");
601 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
602 export = class_handle2object(conn->cookie);
606 struct obd_device *class_exp2obd(struct obd_export *exp)
613 struct obd_device *class_conn2obd(struct lustre_handle *conn)
615 struct obd_export *export;
616 export = class_conn2export(conn);
618 struct obd_device *obd = export->exp_obd;
619 class_export_put(export);
625 struct obd_import *class_exp2cliimp(struct obd_export *exp)
627 struct obd_device *obd = exp->exp_obd;
630 return obd->u.cli.cl_import;
633 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
635 struct obd_device *obd = class_conn2obd(conn);
638 return obd->u.cli.cl_import;
641 /* Export management functions */
642 static void export_handle_addref(void *export)
644 class_export_get(export);
647 /* called from mds_commit_cb() in context of journal commit callback
648 * and cannot call any blocking functions. */
649 void __class_export_put(struct obd_export *exp)
651 if (atomic_dec_and_test(&exp->exp_refcount)) {
652 LASSERT (list_empty(&exp->exp_obd_chain));
654 CDEBUG(D_IOCTL, "final put %p/%s\n",
655 exp, exp->exp_client_uuid.uuid);
657 spin_lock(&obd_zombie_impexp_lock);
658 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
659 spin_unlock(&obd_zombie_impexp_lock);
661 obd_zombie_impexp_notify();
664 EXPORT_SYMBOL(__class_export_put);
666 void class_export_destroy(struct obd_export *exp)
668 struct obd_device *obd = exp->exp_obd;
670 LASSERT (atomic_read(&exp->exp_refcount) == 0);
672 CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp,
673 exp->exp_client_uuid.uuid);
675 LASSERT(obd != NULL);
677 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
678 if (exp->exp_connection)
679 ptlrpc_put_connection_superhack(exp->exp_connection);
681 LASSERT(list_empty(&exp->exp_outstanding_replies));
682 LASSERT(list_empty(&exp->exp_uncommitted_replies));
683 LASSERT(list_empty(&exp->exp_req_replay_queue));
684 LASSERT(list_empty(&exp->exp_queued_rpc));
685 obd_destroy_export(exp);
687 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
691 /* Creates a new export, adds it to the hash table, and returns a
692 * pointer to it. The refcount is 2: one for the hash reference, and
693 * one for the pointer returned by this function. */
694 struct obd_export *class_new_export(struct obd_device *obd,
695 struct obd_uuid *cluuid)
697 struct obd_export *export;
700 OBD_ALLOC(export, sizeof(*export));
702 return ERR_PTR(-ENOMEM);
704 export->exp_conn_cnt = 0;
705 export->exp_lock_hash = NULL;
706 atomic_set(&export->exp_refcount, 2);
707 atomic_set(&export->exp_rpc_count, 0);
708 export->exp_obd = obd;
709 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
710 spin_lock_init(&export->exp_uncommitted_replies_lock);
711 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
712 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
713 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
715 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
716 class_handle_hash(&export->exp_handle, export_handle_addref);
717 export->exp_last_request_time = cfs_time_current_sec();
718 spin_lock_init(&export->exp_lock);
719 INIT_HLIST_NODE(&export->exp_uuid_hash);
720 INIT_HLIST_NODE(&export->exp_nid_hash);
722 export->exp_client_uuid = *cluuid;
723 obd_init_export(export);
725 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
726 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
727 &export->exp_uuid_hash);
729 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
730 obd->obd_name, cluuid->uuid, rc);
731 class_handle_unhash(&export->exp_handle);
732 OBD_FREE_PTR(export);
733 return ERR_PTR(-EALREADY);
737 spin_lock(&obd->obd_dev_lock);
738 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
740 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
741 list_add_tail(&export->exp_obd_chain_timed,
742 &export->exp_obd->obd_exports_timed);
743 export->exp_obd->obd_num_exports++;
744 spin_unlock(&obd->obd_dev_lock);
748 EXPORT_SYMBOL(class_new_export);
750 void class_unlink_export(struct obd_export *exp)
752 class_handle_unhash(&exp->exp_handle);
754 spin_lock(&exp->exp_obd->obd_dev_lock);
755 /* delete an uuid-export hashitem from hashtables */
756 if (!hlist_unhashed(&exp->exp_uuid_hash))
757 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
758 &exp->exp_client_uuid,
759 &exp->exp_uuid_hash);
761 list_del_init(&exp->exp_obd_chain);
762 list_del_init(&exp->exp_obd_chain_timed);
763 exp->exp_obd->obd_num_exports--;
764 spin_unlock(&exp->exp_obd->obd_dev_lock);
765 /* Keep these counter valid always */
766 spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
767 if (exp->exp_delayed)
768 exp->exp_obd->obd_delayed_clients--;
769 else if (exp->exp_replay_needed)
770 exp->exp_obd->obd_recoverable_clients--;
771 spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
772 class_export_put(exp);
774 EXPORT_SYMBOL(class_unlink_export);
776 /* Import management functions */
777 static void import_handle_addref(void *import)
779 class_import_get(import);
782 struct obd_import *class_import_get(struct obd_import *import)
784 LASSERT(atomic_read(&import->imp_refcount) >= 0);
785 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
786 atomic_inc(&import->imp_refcount);
787 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
788 atomic_read(&import->imp_refcount),
789 import->imp_obd->obd_name);
792 EXPORT_SYMBOL(class_import_get);
794 void class_import_put(struct obd_import *import)
798 LASSERT(atomic_read(&import->imp_refcount) > 0);
799 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
800 LASSERT(list_empty(&import->imp_zombie_chain));
802 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
803 atomic_read(&import->imp_refcount) - 1,
804 import->imp_obd->obd_name);
806 if (atomic_dec_and_test(&import->imp_refcount)) {
807 CDEBUG(D_INFO, "final put import %p\n", import);
808 spin_lock(&obd_zombie_impexp_lock);
809 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
810 spin_unlock(&obd_zombie_impexp_lock);
812 obd_zombie_impexp_notify();
817 EXPORT_SYMBOL(class_import_put);
819 void class_import_destroy(struct obd_import *import)
823 CDEBUG(D_IOCTL, "destroying import %p\n", import);
825 LASSERT(atomic_read(&import->imp_refcount) == 0);
827 ptlrpc_put_connection_superhack(import->imp_connection);
829 while (!list_empty(&import->imp_conn_list)) {
830 struct obd_import_conn *imp_conn;
832 imp_conn = list_entry(import->imp_conn_list.next,
833 struct obd_import_conn, oic_item);
834 list_del(&imp_conn->oic_item);
835 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
836 OBD_FREE(imp_conn, sizeof(*imp_conn));
839 class_decref(import->imp_obd);
840 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
844 static void init_imp_at(struct imp_at *at) {
846 at_init(&at->iat_net_latency, 0, 0);
847 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
848 /* max service estimates are tracked on the server side, so
849 don't use the AT history here, just use the last reported
850 val. (But keep hist for proc histogram, worst_ever) */
851 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
856 struct obd_import *class_new_import(struct obd_device *obd)
858 struct obd_import *imp;
860 OBD_ALLOC(imp, sizeof(*imp));
864 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
865 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
866 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
867 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
868 spin_lock_init(&imp->imp_lock);
869 imp->imp_last_success_conn = 0;
870 imp->imp_state = LUSTRE_IMP_NEW;
871 imp->imp_obd = class_incref(obd);
872 cfs_waitq_init(&imp->imp_recovery_waitq);
874 atomic_set(&imp->imp_refcount, 2);
875 atomic_set(&imp->imp_unregistering, 0);
876 atomic_set(&imp->imp_inflight, 0);
877 atomic_set(&imp->imp_replay_inflight, 0);
878 atomic_set(&imp->imp_inval_count, 0);
879 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
880 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
881 class_handle_hash(&imp->imp_handle, import_handle_addref);
882 init_imp_at(&imp->imp_at);
884 /* b1_8 supports both v1 & v2. but HEAD only supports v2.
887 #define HAVE_DEFAULT_V2_CONNECT 1
888 #ifdef HAVE_DEFAULT_V2_CONNECT
889 /* the default magic is V2, will be used in connect RPC, and
890 * then adjusted according to the flags in request/reply. */
891 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
893 /* the default magic is V1, will be used in connect RPC, and
894 * then adjusted according to the flags in request/reply. */
895 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V1;
900 EXPORT_SYMBOL(class_new_import);
902 void class_destroy_import(struct obd_import *import)
904 LASSERT(import != NULL);
905 LASSERT(import != LP_POISON);
907 class_handle_unhash(&import->imp_handle);
909 spin_lock(&import->imp_lock);
910 import->imp_generation++;
911 spin_unlock(&import->imp_lock);
913 class_import_put(import);
915 EXPORT_SYMBOL(class_destroy_import);
917 /* A connection defines an export context in which preallocation can
918 be managed. This releases the export pointer reference, and returns
919 the export handle, so the export refcount is 1 when this function
921 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
922 struct obd_uuid *cluuid)
924 struct obd_export *export;
925 LASSERT(conn != NULL);
926 LASSERT(obd != NULL);
927 LASSERT(cluuid != NULL);
930 export = class_new_export(obd, cluuid);
932 RETURN(PTR_ERR(export));
934 conn->cookie = export->exp_handle.h_cookie;
935 class_export_put(export);
937 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
938 cluuid->uuid, conn->cookie);
941 EXPORT_SYMBOL(class_connect);
943 /* This function removes 1-3 references from the export:
944 * 1 - for export pointer passed
945 * and if disconnect really need
946 * 2 - removing from hash
947 * 3 - in client_unlink_export
948 * The export pointer passed to this function can destroyed */
949 int class_disconnect(struct obd_export *export)
951 int already_disconnected;
954 if (export == NULL) {
956 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
960 spin_lock(&export->exp_lock);
961 already_disconnected = export->exp_disconnected;
962 export->exp_disconnected = 1;
963 spin_unlock(&export->exp_lock);
966 /* class_cleanup(), abort_recovery(), and class_fail_export()
967 * all end up in here, and if any of them race we shouldn't
968 * call extra class_export_puts(). */
969 if (already_disconnected)
970 GOTO(no_disconn, already_disconnected);
972 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
973 export->exp_handle.h_cookie);
976 if (!hlist_unhashed(&export->exp_nid_hash))
977 lustre_hash_del(export->exp_obd->obd_nid_hash,
978 &export->exp_connection->c_peer.nid,
979 &export->exp_nid_hash);
981 class_unlink_export(export);
984 class_export_put(export);
988 static void class_disconnect_export_list(struct list_head *list,
989 enum obd_option flags)
992 struct obd_export *exp;
995 /* It's possible that an export may disconnect itself, but
996 * nothing else will be added to this list. */
997 while (!list_empty(list)) {
998 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
999 /* need for safe call CDEBUG after obd_disconnect */
1000 class_export_get(exp);
1002 spin_lock(&exp->exp_lock);
1003 exp->exp_flags = flags;
1004 spin_unlock(&exp->exp_lock);
1006 if (obd_uuid_equals(&exp->exp_client_uuid,
1007 &exp->exp_obd->obd_uuid)) {
1009 "exp %p export uuid == obd uuid, don't discon\n",
1011 /* Need to delete this now so we don't end up pointing
1012 * to work_list later when this export is cleaned up. */
1013 list_del_init(&exp->exp_obd_chain);
1014 class_export_put(exp);
1018 class_export_get(exp);
1019 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1020 "last request at %ld\n",
1021 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1022 exp, exp->exp_last_request_time);
1024 /* release one export reference anyway */
1025 rc = obd_disconnect(exp);
1026 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1027 obd_export_nid2str(exp), exp, rc);
1028 class_export_put(exp);
1033 void class_disconnect_exports(struct obd_device *obd)
1035 struct list_head work_list;
1038 /* Move all of the exports from obd_exports to a work list, en masse. */
1039 CFS_INIT_LIST_HEAD(&work_list);
1040 spin_lock(&obd->obd_dev_lock);
1041 list_splice_init(&obd->obd_delayed_exports, &work_list);
1042 list_splice_init(&obd->obd_exports, &work_list);
1043 spin_unlock(&obd->obd_dev_lock);
1045 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1046 "disconnecting them\n", obd->obd_minor, obd);
1047 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd));
1050 EXPORT_SYMBOL(class_disconnect_exports);
1052 /* Remove exports that have not completed recovery. */
1053 void class_disconnect_stale_exports(struct obd_device *obd,
1054 enum obd_option flags)
1056 struct list_head work_list;
1057 struct list_head *pos, *n;
1058 struct obd_export *exp;
1061 CFS_INIT_LIST_HEAD(&work_list);
1062 spin_lock(&obd->obd_dev_lock);
1063 list_for_each_safe(pos, n, &obd->obd_exports) {
1064 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1065 if (exp->exp_replay_needed) {
1066 list_move(&exp->exp_obd_chain, &work_list);
1067 obd->obd_stale_clients++;
1070 spin_unlock(&obd->obd_dev_lock);
1072 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1073 obd->obd_name, obd->obd_stale_clients);
1074 class_disconnect_export_list(&work_list, flags);
1077 EXPORT_SYMBOL(class_disconnect_stale_exports);
1079 void class_disconnect_expired_exports(struct obd_device *obd)
1081 struct list_head expired_list;
1082 struct obd_export *exp, *n;
1086 CFS_INIT_LIST_HEAD(&expired_list);
1087 spin_lock(&obd->obd_dev_lock);
1088 list_for_each_entry_safe(exp, n, &obd->obd_delayed_exports,
1090 if (exp_expired(exp, obd->u.obt.obt_stale_export_age)) {
1091 list_move(&exp->exp_obd_chain, &expired_list);
1095 spin_unlock(&obd->obd_dev_lock);
1100 CDEBUG(D_INFO, "%s: disconnecting %d expired exports\n",
1101 obd->obd_name, cnt);
1102 class_disconnect_export_list(&expired_list, exp_flags_from_obd(obd));
1106 EXPORT_SYMBOL(class_disconnect_expired_exports);
1108 void class_set_export_delayed(struct obd_export *exp)
1110 struct obd_device *obd = class_exp2obd(exp);
1112 LASSERT(!exp->exp_delayed);
1113 spin_lock(&exp->exp_lock);
1114 exp->exp_delayed = 1;
1115 spin_unlock(&exp->exp_lock);
1117 /* no need to ping delayed exports */
1118 spin_lock(&obd->obd_dev_lock);
1119 list_del_init(&exp->exp_obd_chain_timed);
1120 list_move_tail(&exp->exp_obd_chain, &obd->obd_delayed_exports);
1121 spin_unlock(&obd->obd_dev_lock);
1123 LASSERT(obd->obd_recoverable_clients > 0);
1125 spin_lock_bh(&obd->obd_processing_task_lock);
1126 obd->obd_delayed_clients++;
1127 obd->obd_recoverable_clients--;
1128 spin_unlock_bh(&obd->obd_processing_task_lock);
1130 CDEBUG(D_HA, "%s: set client %s as delayed\n",
1131 obd->obd_name, exp->exp_client_uuid.uuid);
1133 EXPORT_SYMBOL(class_set_export_delayed);
1136 * Manage exports that have not completed recovery.
1138 void class_handle_stale_exports(struct obd_device *obd)
1140 struct list_head delay_list, evict_list;
1141 struct obd_export *exp, *n;
1145 CFS_INIT_LIST_HEAD(&delay_list);
1146 CFS_INIT_LIST_HEAD(&evict_list);
1147 spin_lock(&obd->obd_dev_lock);
1148 list_for_each_entry_safe(exp, n, &obd->obd_exports, exp_obd_chain) {
1149 LASSERT(!exp->exp_delayed);
1150 /* clients finished recovery */
1151 if (!exp->exp_replay_needed)
1153 /* connected non-vbr clients are evicted */
1154 if (exp->exp_in_recovery && !exp_connect_vbr(exp)) {
1155 obd->obd_stale_clients++;
1156 list_move_tail(&exp->exp_obd_chain, &evict_list);
1159 if (obd->obd_version_recov || !exp->exp_in_recovery) {
1160 list_move_tail(&exp->exp_obd_chain, &delay_list);
1164 #ifndef HAVE_DELAYED_RECOVERY
1165 /* delayed recovery is turned off, evict all delayed exports */
1166 list_splice_init(&delay_list, &evict_list);
1167 list_splice_init(&obd->obd_delayed_exports, &evict_list);
1168 obd->obd_stale_clients += delayed;
1170 spin_unlock(&obd->obd_dev_lock);
1172 list_for_each_entry_safe(exp, n, &delay_list, exp_obd_chain) {
1173 class_set_export_delayed(exp);
1174 exp->exp_last_request_time = cfs_time_current_sec();
1176 LASSERT(list_empty(&delay_list));
1178 /* evict clients without VBR support */
1179 class_disconnect_export_list(&evict_list, exp_flags_from_obd(obd));
1183 EXPORT_SYMBOL(class_handle_stale_exports);
1185 int oig_init(struct obd_io_group **oig_out)
1187 struct obd_io_group *oig;
1190 OBD_ALLOC(oig, sizeof(*oig));
1194 spin_lock_init(&oig->oig_lock);
1196 oig->oig_pending = 0;
1197 atomic_set(&oig->oig_refcount, 1);
1198 cfs_waitq_init(&oig->oig_waitq);
1199 CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1204 EXPORT_SYMBOL(oig_init);
1206 static inline void oig_grab(struct obd_io_group *oig)
1208 atomic_inc(&oig->oig_refcount);
1211 void oig_release(struct obd_io_group *oig)
1213 if (atomic_dec_and_test(&oig->oig_refcount))
1214 OBD_FREE(oig, sizeof(*oig));
1216 EXPORT_SYMBOL(oig_release);
1218 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1221 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1222 spin_lock(&oig->oig_lock);
1228 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1230 spin_unlock(&oig->oig_lock);
1235 EXPORT_SYMBOL(oig_add_one);
1237 void oig_complete_one(struct obd_io_group *oig,
1238 struct oig_callback_context *occ, int rc)
1240 cfs_waitq_t *wake = NULL;
1243 spin_lock(&oig->oig_lock);
1246 list_del_init(&occ->occ_oig_item);
1248 old_rc = oig->oig_rc;
1249 if (oig->oig_rc == 0 && rc != 0)
1252 if (--oig->oig_pending <= 0)
1253 wake = &oig->oig_waitq;
1255 spin_unlock(&oig->oig_lock);
1257 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1258 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1261 cfs_waitq_signal(wake);
1264 EXPORT_SYMBOL(oig_complete_one);
1266 static int oig_done(struct obd_io_group *oig)
1269 spin_lock(&oig->oig_lock);
1270 if (oig->oig_pending <= 0)
1272 spin_unlock(&oig->oig_lock);
1276 static void interrupted_oig(void *data)
1278 struct obd_io_group *oig = data;
1279 struct oig_callback_context *occ;
1281 spin_lock(&oig->oig_lock);
1282 /* We need to restart the processing each time we drop the lock, as
1283 * it is possible other threads called oig_complete_one() to remove
1284 * an entry elsewhere in the list while we dropped lock. We need to
1285 * drop the lock because osc_ap_completion() calls oig_complete_one()
1286 * which re-gets this lock ;-) as well as a lock ordering issue. */
1288 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1289 if (occ->interrupted)
1291 occ->interrupted = 1;
1292 spin_unlock(&oig->oig_lock);
1293 occ->occ_interrupted(occ);
1294 spin_lock(&oig->oig_lock);
1297 spin_unlock(&oig->oig_lock);
1300 int oig_wait(struct obd_io_group *oig)
1302 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1305 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1308 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1309 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1310 /* we can't continue until the oig has emptied and stopped
1311 * referencing state that the caller will free upon return */
1313 lwi = (struct l_wait_info){ 0, };
1314 } while (rc == -EINTR);
1316 LASSERTF(oig->oig_pending == 0,
1317 "exiting oig_wait(oig = %p) with %d pending\n", oig,
1320 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1323 EXPORT_SYMBOL(oig_wait);
1325 void class_fail_export(struct obd_export *exp)
1327 int rc, already_failed;
1329 spin_lock(&exp->exp_lock);
1330 already_failed = exp->exp_failed;
1331 exp->exp_failed = 1;
1332 spin_unlock(&exp->exp_lock);
1334 if (already_failed) {
1335 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1336 exp, exp->exp_client_uuid.uuid);
1340 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1341 exp, exp->exp_client_uuid.uuid);
1343 if (obd_dump_on_timeout)
1344 libcfs_debug_dumplog();
1346 /* Most callers into obd_disconnect are removing their own reference
1347 * (request, for example) in addition to the one from the hash table.
1348 * We don't have such a reference here, so make one. */
1349 class_export_get(exp);
1350 rc = obd_disconnect(exp);
1352 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1354 CDEBUG(D_HA, "disconnected export %p/%s\n",
1355 exp, exp->exp_client_uuid.uuid);
1357 EXPORT_SYMBOL(class_fail_export);
1359 char *obd_export_nid2str(struct obd_export *exp)
1361 if (exp->exp_connection != NULL)
1362 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1366 EXPORT_SYMBOL(obd_export_nid2str);
1368 int obd_export_evict_by_nid(struct obd_device *obd, char *nid)
1370 struct obd_export *doomed_exp = NULL;
1371 int exports_evicted = 0;
1373 lnet_nid_t nid_key = libcfs_str2nid(nid);
1376 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1378 if (doomed_exp == NULL)
1381 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1382 "nid %s found, wanted nid %s, requested nid %s\n",
1383 obd_export_nid2str(doomed_exp),
1384 libcfs_nid2str(nid_key), nid);
1387 CDEBUG(D_HA, "%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1388 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1390 class_fail_export(doomed_exp);
1391 class_export_put(doomed_exp);
1394 if (!exports_evicted)
1395 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1396 obd->obd_name, nid);
1397 return exports_evicted;
1399 EXPORT_SYMBOL(obd_export_evict_by_nid);
1401 int obd_export_evict_by_uuid(struct obd_device *obd, char *uuid)
1403 struct obd_export *doomed_exp = NULL;
1404 struct obd_uuid doomed_uuid;
1405 int exports_evicted = 0;
1407 obd_str2uuid(&doomed_uuid, uuid);
1408 if(obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1409 CERROR("%s: can't evict myself\n", obd->obd_name);
1410 return exports_evicted;
1413 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1415 if (doomed_exp == NULL) {
1416 CERROR("%s: can't disconnect %s: no exports found\n",
1417 obd->obd_name, uuid);
1419 CWARN("%s: evicting %s at adminstrative request\n",
1420 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1421 class_fail_export(doomed_exp);
1422 class_export_put(doomed_exp);
1426 return exports_evicted;
1428 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1430 void obd_zombie_impexp_cull(void)
1432 struct obd_import *import;
1433 struct obd_export *export;
1436 spin_lock (&obd_zombie_impexp_lock);
1439 if (!list_empty(&obd_zombie_imports)) {
1440 import = list_entry(obd_zombie_imports.next,
1443 list_del(&import->imp_zombie_chain);
1447 if (!list_empty(&obd_zombie_exports)) {
1448 export = list_entry(obd_zombie_exports.next,
1451 list_del_init(&export->exp_obd_chain);
1454 spin_unlock(&obd_zombie_impexp_lock);
1457 class_import_destroy(import);
1460 class_export_destroy(export);
1462 } while (import != NULL || export != NULL);
1465 static struct completion obd_zombie_start;
1466 static struct completion obd_zombie_stop;
1467 static unsigned long obd_zombie_flags;
1468 static cfs_waitq_t obd_zombie_waitq;
1474 int obd_zombi_impexp_check(void *arg)
1478 spin_lock(&obd_zombie_impexp_lock);
1479 rc = list_empty(&obd_zombie_imports) &&
1480 list_empty(&obd_zombie_exports) &&
1481 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1483 spin_unlock(&obd_zombie_impexp_lock);
1488 static void obd_zombie_impexp_notify(void)
1490 cfs_waitq_signal(&obd_zombie_waitq);
1495 static int obd_zombie_impexp_thread(void *unused)
1499 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1500 complete(&obd_zombie_start);
1504 complete(&obd_zombie_start);
1506 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1507 struct l_wait_info lwi = { 0 };
1509 l_wait_event(obd_zombie_waitq, !obd_zombi_impexp_check(NULL), &lwi);
1511 obd_zombie_impexp_cull();
1514 complete(&obd_zombie_stop);
1519 #else /* ! KERNEL */
1521 static atomic_t zombi_recur = ATOMIC_INIT(0);
1522 static void *obd_zombi_impexp_work_cb;
1523 static void *obd_zombi_impexp_idle_cb;
1525 int obd_zombi_impexp_kill(void *arg)
1529 if (atomic_inc_return(&zombi_recur) == 1) {
1530 obd_zombie_impexp_cull();
1533 atomic_dec(&zombi_recur);
1539 int obd_zombie_impexp_init(void)
1543 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1544 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1545 spin_lock_init(&obd_zombie_impexp_lock);
1546 init_completion(&obd_zombie_start);
1547 init_completion(&obd_zombie_stop);
1548 cfs_waitq_init(&obd_zombie_waitq);
1551 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1555 wait_for_completion(&obd_zombie_start);
1558 obd_zombi_impexp_work_cb =
1559 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1560 &obd_zombi_impexp_kill, NULL);
1562 obd_zombi_impexp_idle_cb =
1563 liblustre_register_idle_callback("obd_zombi_impexp_check",
1564 &obd_zombi_impexp_check, NULL);
1571 void obd_zombie_impexp_stop(void)
1573 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1574 obd_zombie_impexp_notify();
1576 wait_for_completion(&obd_zombie_stop);
1578 liblustre_deregister_wait_callback(obd_zombi_impexp_work_cb);
1579 liblustre_deregister_idle_callback(obd_zombi_impexp_idle_cb);