1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
24 * These are the only exported functions, they provide some generic
25 * infrastructure for managing object devices
28 #define DEBUG_SUBSYSTEM S_CLASS
30 #include <linux/kmod.h> /* for request_module() */
31 #include <linux/module.h>
33 #include <liblustre.h>
35 #include <linux/lustre_mds.h>
36 #include <linux/obd_ost.h>
37 #include <linux/obd_class.h>
38 #include <linux/lprocfs_status.h>
40 extern struct list_head obd_types;
41 static spinlock_t obd_types_lock = SPIN_LOCK_UNLOCKED;
43 kmem_cache_t *obdo_cachep = NULL;
44 EXPORT_SYMBOL(obdo_cachep);
45 kmem_cache_t *import_cachep = NULL;
47 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
48 void (*ptlrpc_abort_inflight_superhack)(struct obd_import *imp);
51 * support functions: we could use inter-module communication, but this
52 * is more portable to other OS's
54 struct obd_type *class_search_type(char *name)
56 struct list_head *tmp;
57 struct obd_type *type;
59 spin_lock(&obd_types_lock);
60 list_for_each(tmp, &obd_types) {
61 type = list_entry(tmp, struct obd_type, typ_chain);
62 if (strcmp(type->typ_name, name) == 0) {
63 spin_unlock(&obd_types_lock);
67 spin_unlock(&obd_types_lock);
71 struct obd_type *class_get_type(char *name)
73 struct obd_type *type = class_search_type(name);
78 if (strcmp(modname, LUSTRE_MDT_NAME) == 0)
79 modname = LUSTRE_MDS_NAME;
80 if (!request_module(modname)) {
81 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
82 type = class_search_type(name);
84 LCONSOLE_ERROR("Can't load module '%s'\n", modname);
89 try_module_get(type->typ_ops->o_owner);
93 void class_put_type(struct obd_type *type)
96 module_put(type->typ_ops->o_owner);
99 int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars,
102 struct obd_type *type;
106 LASSERT(strnlen(name, 1024) < 1024); /* sanity check */
108 if (class_search_type(name)) {
109 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
114 OBD_ALLOC(type, sizeof(*type));
118 OBD_ALLOC(type->typ_ops, sizeof(*type->typ_ops));
119 OBD_ALLOC(type->typ_name, strlen(name) + 1);
120 if (type->typ_ops == NULL || type->typ_name == NULL)
123 *(type->typ_ops) = *ops;
124 strcpy(type->typ_name, name);
127 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
129 if (IS_ERR(type->typ_procroot)) {
130 rc = PTR_ERR(type->typ_procroot);
131 type->typ_procroot = NULL;
136 spin_lock(&obd_types_lock);
137 list_add(&type->typ_chain, &obd_types);
138 spin_unlock(&obd_types_lock);
143 if (type->typ_name != NULL)
144 OBD_FREE(type->typ_name, strlen(name) + 1);
145 if (type->typ_ops != NULL)
146 OBD_FREE (type->typ_ops, sizeof (*type->typ_ops));
147 OBD_FREE(type, sizeof(*type));
151 int class_unregister_type(char *name)
153 struct obd_type *type = class_search_type(name);
157 CERROR("unknown obd type\n");
161 if (type->typ_refcnt) {
162 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
163 /* This is a bad situation, let's make the best of it */
164 /* Remove ops, but leave the name for debugging */
165 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
169 if (type->typ_procroot) {
170 lprocfs_remove(type->typ_procroot);
171 type->typ_procroot = NULL;
174 spin_lock(&obd_types_lock);
175 list_del(&type->typ_chain);
176 spin_unlock(&obd_types_lock);
177 OBD_FREE(type->typ_name, strlen(name) + 1);
178 if (type->typ_ops != NULL)
179 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
180 OBD_FREE(type, sizeof(*type));
182 } /* class_unregister_type */
184 struct obd_device *class_newdev(struct obd_type *type, char *name)
186 struct obd_device *result = NULL;
189 spin_lock(&obd_dev_lock);
190 for (i = 0 ; i < MAX_OBD_DEVICES; i++) {
191 struct obd_device *obd = &obd_dev[i];
192 if (obd->obd_name && (strcmp(name, obd->obd_name) == 0)) {
193 CERROR("Device %s already exists, won't add\n", name);
195 result->obd_type = NULL;
196 result->obd_name = NULL;
201 if (!result && !obd->obd_type) {
202 LASSERT(obd->obd_minor == i);
203 memset(obd, 0, sizeof(*obd));
205 obd->obd_type = type;
206 obd->obd_name = name;
207 CDEBUG(D_IOCTL, "Adding new device %s\n",
212 spin_unlock(&obd_dev_lock);
216 void class_release_dev(struct obd_device *obd)
218 int minor = obd->obd_minor;
220 spin_lock(&obd_dev_lock);
221 memset(obd, 0x5a, sizeof(*obd));
222 obd->obd_type = NULL;
223 obd->obd_minor = minor;
224 obd->obd_name = NULL;
225 spin_unlock(&obd_dev_lock);
228 int class_name2dev(char *name)
235 spin_lock(&obd_dev_lock);
236 for (i = 0; i < MAX_OBD_DEVICES; i++) {
237 struct obd_device *obd = &obd_dev[i];
238 if (obd->obd_name && strcmp(name, obd->obd_name) == 0) {
239 /* Make sure we finished attaching before we give
240 out any references */
241 if (obd->obd_attached) {
242 spin_unlock(&obd_dev_lock);
248 spin_unlock(&obd_dev_lock);
253 struct obd_device *class_name2obd(char *name)
255 int dev = class_name2dev(name);
258 return &obd_dev[dev];
261 int class_uuid2dev(struct obd_uuid *uuid)
265 spin_lock(&obd_dev_lock);
266 for (i = 0; i < MAX_OBD_DEVICES; i++) {
267 struct obd_device *obd = &obd_dev[i];
268 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
269 spin_unlock(&obd_dev_lock);
273 spin_unlock(&obd_dev_lock);
278 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
280 int dev = class_uuid2dev(uuid);
283 return &obd_dev[dev];
286 void class_obd_list(void)
291 spin_lock(&obd_dev_lock);
292 for (i = 0; i < MAX_OBD_DEVICES; i++) {
293 struct obd_device *obd = &obd_dev[i];
294 if (obd->obd_type == NULL)
296 if (obd->obd_stopping)
298 else if (obd->obd_set_up)
300 else if (obd->obd_attached)
304 LCONSOLE(D_WARNING, "%3d %s %s %s %s %d\n",
305 i, status, obd->obd_type->typ_name,
306 obd->obd_name, obd->obd_uuid.uuid,
307 atomic_read(&obd->obd_refcount));
309 spin_unlock(&obd_dev_lock);
313 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
314 specified, then only the client with that uuid is returned,
315 otherwise any client connected to the tgt is returned. */
316 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
318 struct obd_uuid *grp_uuid)
322 spin_lock(&obd_dev_lock);
323 for (i = 0; i < MAX_OBD_DEVICES; i++) {
324 struct obd_device *obd = &obd_dev[i];
325 if (obd->obd_type == NULL)
327 if ((strncmp(obd->obd_type->typ_name, typ_name,
328 strlen(typ_name)) == 0)) {
329 struct client_obd *cli = &obd->u.cli;
330 struct obd_import *imp = cli->cl_import;
331 if (obd_uuid_equals(tgt_uuid, &imp->imp_target_uuid) &&
332 ((grp_uuid)? obd_uuid_equals(grp_uuid,
333 &obd->obd_uuid) : 1)) {
334 spin_unlock(&obd_dev_lock);
339 spin_unlock(&obd_dev_lock);
344 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
345 struct obd_uuid *grp_uuid)
347 struct obd_device *obd;
349 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
351 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
356 /* Iterate the obd_device list looking devices have grp_uuid. Start
357 searching at *next, and if a device is found, the next index to look
358 at is saved in *next. If next is NULL, then the first matching device
359 will always be returned. */
360 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
366 else if (*next >= 0 && *next < MAX_OBD_DEVICES)
371 spin_lock(&obd_dev_lock);
372 for (; i < MAX_OBD_DEVICES; i++) {
373 struct obd_device *obd = &obd_dev[i];
374 if (obd->obd_type == NULL)
376 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
379 spin_unlock(&obd_dev_lock);
383 spin_unlock(&obd_dev_lock);
389 void obd_cleanup_caches(void)
393 LASSERTF(kmem_cache_destroy(obdo_cachep) == 0,
394 "Cannot destory ll_obdo_cache\n");
398 LASSERTF(kmem_cache_destroy(import_cachep) == 0,
399 "Cannot destory ll_import_cache\n");
400 import_cachep = NULL;
405 int obd_init_caches(void)
409 LASSERT(obdo_cachep == NULL);
410 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
415 LASSERT(import_cachep == NULL);
416 import_cachep = kmem_cache_create("ll_import_cache",
417 sizeof(struct obd_import),
424 obd_cleanup_caches();
429 /* map connection to client */
430 struct obd_export *class_conn2export(struct lustre_handle *conn)
432 struct obd_export *export;
436 CDEBUG(D_CACHE, "looking for null handle\n");
440 if (conn->cookie == -1) { /* this means assign a new connection */
441 CDEBUG(D_CACHE, "want a new connection\n");
445 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
446 export = class_handle2object(conn->cookie);
450 struct obd_device *class_exp2obd(struct obd_export *exp)
457 struct obd_device *class_conn2obd(struct lustre_handle *conn)
459 struct obd_export *export;
460 export = class_conn2export(conn);
462 struct obd_device *obd = export->exp_obd;
463 class_export_put(export);
469 struct obd_import *class_exp2cliimp(struct obd_export *exp)
471 struct obd_device *obd = exp->exp_obd;
474 return obd->u.cli.cl_import;
477 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
479 struct obd_device *obd = class_conn2obd(conn);
482 return obd->u.cli.cl_import;
485 /* Export management functions */
486 static void export_handle_addref(void *export)
488 class_export_get(export);
491 void __class_export_put(struct obd_export *exp)
493 if (atomic_dec_and_test(&exp->exp_refcount)) {
494 struct obd_device *obd = exp->exp_obd;
495 CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp,
496 exp->exp_client_uuid.uuid);
498 LASSERT(obd != NULL);
500 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
501 if (exp->exp_connection)
502 ptlrpc_put_connection_superhack(exp->exp_connection);
504 LASSERT(list_empty(&exp->exp_outstanding_replies));
505 LASSERT(list_empty(&exp->exp_handle.h_link));
506 obd_destroy_export(exp);
508 OBD_FREE(exp, sizeof(*exp));
512 EXPORT_SYMBOL(__class_export_put);
514 /* Creates a new export, adds it to the hash table, and returns a
515 * pointer to it. The refcount is 2: one for the hash reference, and
516 * one for the pointer returned by this function. */
517 struct obd_export *class_new_export(struct obd_device *obd)
519 struct obd_export *export;
521 OBD_ALLOC(export, sizeof(*export));
523 CERROR("no memory! (minor %d)\n", obd->obd_minor);
527 export->exp_conn_cnt = 0;
528 atomic_set(&export->exp_refcount, 2);
529 export->exp_obd = obd;
530 INIT_LIST_HEAD(&export->exp_outstanding_replies);
531 /* XXX this should be in LDLM init */
532 INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
534 INIT_LIST_HEAD(&export->exp_handle.h_link);
535 class_handle_hash(&export->exp_handle, export_handle_addref);
536 export->exp_last_request_time = CURRENT_SECONDS;
537 spin_lock_init(&export->exp_lock);
539 spin_lock(&obd->obd_dev_lock);
540 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
541 atomic_inc(&obd->obd_refcount);
542 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
543 list_add_tail(&export->exp_obd_chain_timed,
544 &export->exp_obd->obd_exports_timed);
545 export->exp_obd->obd_num_exports++;
546 spin_unlock(&obd->obd_dev_lock);
548 obd_init_export(export);
551 EXPORT_SYMBOL(class_new_export);
553 void class_unlink_export(struct obd_export *exp)
555 class_handle_unhash(&exp->exp_handle);
557 spin_lock(&exp->exp_obd->obd_dev_lock);
558 list_del_init(&exp->exp_obd_chain);
559 list_del_init(&exp->exp_obd_chain_timed);
560 exp->exp_obd->obd_num_exports--;
561 spin_unlock(&exp->exp_obd->obd_dev_lock);
563 class_export_put(exp);
565 EXPORT_SYMBOL(class_unlink_export);
567 /* Import management functions */
568 static void import_handle_addref(void *import)
570 class_import_get(import);
573 struct obd_import *class_import_get(struct obd_import *import)
575 LASSERT(atomic_read(&import->imp_refcount) >= 0);
576 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
577 atomic_inc(&import->imp_refcount);
578 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
579 atomic_read(&import->imp_refcount));
582 EXPORT_SYMBOL(class_import_get);
584 void class_import_put(struct obd_import *import)
588 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
589 atomic_read(&import->imp_refcount) - 1);
591 LASSERT(atomic_read(&import->imp_refcount) > 0);
592 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
593 if (!atomic_dec_and_test(&import->imp_refcount)) {
598 CDEBUG(D_IOCTL, "destroying import %p\n", import);
600 ptlrpc_put_connection_superhack(import->imp_connection);
602 while (!list_empty(&import->imp_conn_list)) {
603 struct obd_import_conn *imp_conn;
605 imp_conn = list_entry(import->imp_conn_list.next,
606 struct obd_import_conn, oic_item);
607 list_del(&imp_conn->oic_item);
608 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
609 OBD_FREE(imp_conn, sizeof(*imp_conn));
612 LASSERT(list_empty(&import->imp_handle.h_link));
613 OBD_FREE(import, sizeof(*import));
616 EXPORT_SYMBOL(class_import_put);
618 struct obd_import *class_new_import(void)
620 struct obd_import *imp;
622 OBD_ALLOC(imp, sizeof(*imp));
626 INIT_LIST_HEAD(&imp->imp_replay_list);
627 INIT_LIST_HEAD(&imp->imp_sending_list);
628 INIT_LIST_HEAD(&imp->imp_delayed_list);
629 spin_lock_init(&imp->imp_lock);
630 imp->imp_conn_cnt = 0;
631 imp->imp_max_transno = 0;
632 imp->imp_peer_committed_transno = 0;
633 imp->imp_state = LUSTRE_IMP_NEW;
634 init_waitqueue_head(&imp->imp_recovery_waitq);
636 atomic_set(&imp->imp_refcount, 2);
637 atomic_set(&imp->imp_inflight, 0);
638 atomic_set(&imp->imp_replay_inflight, 0);
639 INIT_LIST_HEAD(&imp->imp_conn_list);
640 INIT_LIST_HEAD(&imp->imp_handle.h_link);
641 class_handle_hash(&imp->imp_handle, import_handle_addref);
645 EXPORT_SYMBOL(class_new_import);
647 void class_destroy_import(struct obd_import *import)
649 LASSERT(import != NULL);
650 LASSERT(import != LP_POISON);
652 class_handle_unhash(&import->imp_handle);
654 /* Abort any inflight DLM requests and NULL out their (about to be
656 /* Invalidate all requests on import, would be better to call
657 ptlrpc_set_import_active(imp, 0); */
658 import->imp_generation++;
659 ptlrpc_abort_inflight_superhack(import);
661 class_import_put(import);
663 EXPORT_SYMBOL(class_destroy_import);
665 /* A connection defines an export context in which preallocation can
666 be managed. This releases the export pointer reference, and returns
667 the export handle, so the export refcount is 1 when this function
669 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
670 struct obd_uuid *cluuid)
672 struct obd_export *export;
673 LASSERT(conn != NULL);
674 LASSERT(obd != NULL);
675 LASSERT(cluuid != NULL);
678 export = class_new_export(obd);
682 conn->cookie = export->exp_handle.h_cookie;
683 memcpy(&export->exp_client_uuid, cluuid,
684 sizeof(export->exp_client_uuid));
685 class_export_put(export);
687 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
688 cluuid->uuid, conn->cookie);
691 EXPORT_SYMBOL(class_connect);
693 /* This function removes two references from the export: one for the
694 * hash entry and one for the export pointer passed in. The export
695 * pointer passed to this function is destroyed should not be used
697 int class_disconnect(struct obd_export *export)
699 int already_disconnected;
702 if (export == NULL) {
704 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
708 spin_lock(&export->exp_lock);
709 already_disconnected = export->exp_disconnected;
710 export->exp_disconnected = 1;
711 spin_unlock(&export->exp_lock);
713 /* class_cleanup(), abort_recovery(), and class_fail_export()
714 * all end up in here, and if any of them race we shouldn't
715 * call extra class_export_puts(). */
716 if (already_disconnected)
719 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
720 export->exp_handle.h_cookie);
722 class_unlink_export(export);
723 class_export_put(export);
727 static void class_disconnect_export_list(struct list_head *list, int flags)
730 struct lustre_handle fake_conn;
731 struct obd_export *fake_exp, *exp;
734 /* It's possible that an export may disconnect itself, but
735 * nothing else will be added to this list. */
736 while(!list_empty(list)) {
737 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
738 class_export_get(exp);
739 exp->exp_flags = flags;
741 if (obd_uuid_equals(&exp->exp_client_uuid,
742 &exp->exp_obd->obd_uuid)) {
744 "exp %p export uuid == obd uuid, don't discon\n",
746 /* Need to delete this now so we don't end up pointing
747 * to work_list later when this export is cleaned up. */
748 list_del_init(&exp->exp_obd_chain);
749 class_export_put(exp);
753 fake_conn.cookie = exp->exp_handle.h_cookie;
754 fake_exp = class_conn2export(&fake_conn);
756 class_export_put(exp);
759 fake_exp->exp_flags = flags;
760 rc = obd_disconnect(fake_exp);
761 class_export_put(exp);
763 CDEBUG(D_HA, "disconnecting export %p failed: %d\n",
766 CDEBUG(D_HA, "export %p disconnected\n", exp);
772 static inline int get_exp_flags_from_obd(struct obd_device *obd)
774 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
775 (obd->obd_force ? OBD_OPT_FORCE : 0));
778 void class_disconnect_exports(struct obd_device *obd)
780 struct list_head work_list;
783 /* Move all of the exports from obd_exports to a work list, en masse. */
784 spin_lock(&obd->obd_dev_lock);
785 list_add(&work_list, &obd->obd_exports);
786 list_del_init(&obd->obd_exports);
787 spin_unlock(&obd->obd_dev_lock);
789 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
790 "disconnecting them\n", obd->obd_minor, obd);
791 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
794 EXPORT_SYMBOL(class_disconnect_exports);
796 /* Remove exports that have not completed recovery.
798 void class_disconnect_stale_exports(struct obd_device *obd)
800 struct list_head work_list;
801 struct list_head *pos, *n;
802 struct obd_export *exp;
806 INIT_LIST_HEAD(&work_list);
807 spin_lock(&obd->obd_dev_lock);
808 list_for_each_safe(pos, n, &obd->obd_exports) {
809 exp = list_entry(pos, struct obd_export, exp_obd_chain);
810 if (exp->exp_replay_needed) {
811 list_del(&exp->exp_obd_chain);
812 list_add(&exp->exp_obd_chain, &work_list);
816 spin_unlock(&obd->obd_dev_lock);
818 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
820 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
823 EXPORT_SYMBOL(class_disconnect_stale_exports);
825 int oig_init(struct obd_io_group **oig_out)
827 struct obd_io_group *oig;
830 OBD_ALLOC(oig, sizeof(*oig));
834 spin_lock_init(&oig->oig_lock);
836 oig->oig_pending = 0;
837 atomic_set(&oig->oig_refcount, 1);
838 init_waitqueue_head(&oig->oig_waitq);
839 INIT_LIST_HEAD(&oig->oig_occ_list);
844 EXPORT_SYMBOL(oig_init);
846 static inline void oig_grab(struct obd_io_group *oig)
848 atomic_inc(&oig->oig_refcount);
851 void oig_release(struct obd_io_group *oig)
853 if (atomic_dec_and_test(&oig->oig_refcount))
854 OBD_FREE(oig, sizeof(*oig));
856 EXPORT_SYMBOL(oig_release);
858 void oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
861 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
862 spin_lock_irqsave(&oig->oig_lock, flags);
865 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
866 spin_unlock_irqrestore(&oig->oig_lock, flags);
869 EXPORT_SYMBOL(oig_add_one);
871 void oig_complete_one(struct obd_io_group *oig,
872 struct oig_callback_context *occ, int rc)
875 wait_queue_head_t *wake = NULL;
878 spin_lock_irqsave(&oig->oig_lock, flags);
881 list_del_init(&occ->occ_oig_item);
883 old_rc = oig->oig_rc;
884 if (oig->oig_rc == 0 && rc != 0)
887 if (--oig->oig_pending <= 0)
888 wake = &oig->oig_waitq;
890 spin_unlock_irqrestore(&oig->oig_lock, flags);
892 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
893 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
899 EXPORT_SYMBOL(oig_complete_one);
901 static int oig_done(struct obd_io_group *oig)
905 spin_lock_irqsave(&oig->oig_lock, flags);
906 if (oig->oig_pending <= 0)
908 spin_unlock_irqrestore(&oig->oig_lock, flags);
912 static void interrupted_oig(void *data)
914 struct obd_io_group *oig = data;
915 struct oig_callback_context *occ;
918 spin_lock_irqsave(&oig->oig_lock, flags);
919 /* We need to restart the processing each time we drop the lock, as
920 * it is possible other threads called oig_complete_one() to remove
921 * an entry elsewhere in the list while we dropped lock. We need to
922 * drop the lock because osc_ap_completion() calls oig_complete_one()
923 * which re-gets this lock ;-) as well as a lock ordering issue. */
925 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
926 if (occ->interrupted)
928 occ->interrupted = 1;
929 spin_unlock_irqrestore(&oig->oig_lock, flags);
930 occ->occ_interrupted(occ);
931 spin_lock_irqsave(&oig->oig_lock, flags);
934 spin_unlock_irqrestore(&oig->oig_lock, flags);
937 int oig_wait(struct obd_io_group *oig)
939 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
942 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
945 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
946 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
947 /* we can't continue until the oig has emptied and stopped
948 * referencing state that the caller will free upon return */
950 lwi = (struct l_wait_info){ 0, };
951 } while (rc == -EINTR);
953 LASSERTF(oig->oig_pending == 0,
954 "exiting oig_wait(oig = %p) with %d pending\n", oig,
957 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
960 EXPORT_SYMBOL(oig_wait);
962 void class_fail_export(struct obd_export *exp)
964 int rc, already_failed;
967 spin_lock_irqsave(&exp->exp_lock, flags);
968 already_failed = exp->exp_failed;
970 spin_unlock_irqrestore(&exp->exp_lock, flags);
972 if (already_failed) {
973 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
974 exp, exp->exp_client_uuid.uuid);
978 CDEBUG(D_HA, "disconnecting export %p/%s\n",
979 exp, exp->exp_client_uuid.uuid);
981 if (obd_dump_on_timeout)
982 libcfs_debug_dumplog();
984 /* Most callers into obd_disconnect are removing their own reference
985 * (request, for example) in addition to the one from the hash table.
986 * We don't have such a reference here, so make one. */
987 class_export_get(exp);
988 rc = obd_disconnect(exp);
990 CERROR("disconnecting export %p failed: %d\n", exp, rc);
992 CDEBUG(D_HA, "disconnected export %p/%s\n",
993 exp, exp->exp_client_uuid.uuid);
995 EXPORT_SYMBOL(class_fail_export);
997 char *obd_export_nid2str(struct obd_export *exp)
999 if (exp->exp_connection != NULL)
1000 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1004 EXPORT_SYMBOL(obd_export_nid2str);
1006 /* Ping evictor thread */
1009 #define PET_TERMINATE 2
1011 static int pet_refcount = 0;
1012 static int pet_state;
1013 static wait_queue_head_t pet_waitq;
1014 static struct obd_export *pet_exp = NULL;
1015 static spinlock_t pet_lock = SPIN_LOCK_UNLOCKED;
1017 static int ping_evictor_wake(struct obd_export *exp)
1019 spin_lock(&pet_lock);
1021 /* eventually the new obd will call here again. */
1022 spin_unlock(&pet_lock);
1026 /* We have to make sure the obd isn't destroyed between now and when
1027 * the ping evictor runs. We'll take a reference here, and drop it
1028 * when we finish in the evictor. We don't really care about this
1029 * export in particular; we just need one to keep the obd alive. */
1030 pet_exp = class_export_get(exp);
1031 spin_unlock(&pet_lock);
1033 wake_up(&pet_waitq);
1037 static int ping_evictor_main(void *arg)
1039 struct obd_device *obd;
1040 struct obd_export *exp;
1041 struct l_wait_info lwi = { 0 };
1043 unsigned long flags;
1048 /* ptlrpc_daemonize() */
1050 lustre_daemonize_helper();
1051 set_fs_pwd(current->fs, init_task.fs->pwdmnt, init_task.fs->pwd);
1052 exit_files(current);
1054 THREAD_NAME(current->comm, sizeof(current->comm), "ping_evictor");
1056 SIGNAL_MASK_LOCK(current, flags);
1057 sigfillset(¤t->blocked);
1059 SIGNAL_MASK_UNLOCK(current, flags);
1062 CDEBUG(D_HA, "Starting Ping Evictor\n");
1064 pet_state = PET_READY;
1066 l_wait_event(pet_waitq, pet_exp ||
1067 (pet_state == PET_TERMINATE), &lwi);
1068 if (pet_state == PET_TERMINATE)
1071 /* we only get here if pet_exp != NULL, and the end of this
1072 * loop is the only place which sets it NULL again, so lock
1073 * is not strictly necessary. */
1074 spin_lock(&pet_lock);
1075 obd = pet_exp->exp_obd;
1076 spin_unlock(&pet_lock);
1078 expire_time = CURRENT_SECONDS - (3 * obd_timeout / 2);
1080 CDEBUG(D_HA, "evicting all exports of obd %s older than %ld\n",
1081 obd->obd_name, expire_time);
1083 /* Exports can't be deleted out of the list while we hold
1084 * the obd lock (class_unlink_export), which means we can't
1085 * lose the last ref on the export. If they've already been
1086 * removed from the list, we won't find them here. */
1087 spin_lock(&obd->obd_dev_lock);
1088 while (!list_empty(&obd->obd_exports_timed)) {
1089 exp = list_entry(obd->obd_exports_timed.next,
1090 struct obd_export,exp_obd_chain_timed);
1092 if (expire_time > exp->exp_last_request_time) {
1093 class_export_get(exp);
1094 spin_unlock(&obd->obd_dev_lock);
1095 LCONSOLE_WARN("%s: haven't heard from %s in %ld"
1096 " seconds. Last request was at %ld. "
1097 "I think it's dead, and I am evicting "
1098 "it.\n", obd->obd_name,
1099 obd_export_nid2str(exp),
1100 (long)(CURRENT_SECONDS -
1101 exp->exp_last_request_time),
1102 exp->exp_last_request_time);
1105 class_fail_export(exp);
1106 class_export_put(exp);
1108 spin_lock(&obd->obd_dev_lock);
1110 /* List is sorted, so everyone below is ok */
1114 spin_unlock(&obd->obd_dev_lock);
1116 class_export_put(pet_exp);
1118 spin_lock(&pet_lock);
1120 spin_unlock(&pet_lock);
1122 CDEBUG(D_HA, "Exiting Ping Evictor\n");
1127 void ping_evictor_start(void)
1131 if (++pet_refcount > 1)
1134 init_waitqueue_head(&pet_waitq);
1136 rc = kernel_thread(ping_evictor_main, NULL, CLONE_VM | CLONE_FS);
1139 CERROR("Cannot start ping evictor thread: %d\n", rc);
1142 EXPORT_SYMBOL(ping_evictor_start);
1144 void ping_evictor_stop(void)
1146 if (--pet_refcount > 0)
1149 pet_state = PET_TERMINATE;
1150 wake_up(&pet_waitq);
1152 EXPORT_SYMBOL(ping_evictor_stop);
1153 #else /* !__KERNEL__ */
1154 #define ping_evictor_wake(exp) 1
1157 /* This function makes sure dead exports are evicted in a timely manner.
1158 This function is only called when some export receives a message (i.e.,
1159 the network is up.) */
1160 void class_update_export_timer(struct obd_export *exp, time_t extra_delay)
1162 struct obd_export *oldest_exp;
1169 /* Compensate for slow machines, etc, by faking our request time
1170 into the future. Although this can break the strict time-ordering
1171 of the list, we can be really lazy here - we don't have to evict
1172 at the exact right moment. Eventually, all silent exports
1173 will make it to the top of the list. */
1174 exp->exp_last_request_time = max(exp->exp_last_request_time,
1175 (time_t)CURRENT_SECONDS + extra_delay);
1177 CDEBUG(D_INFO, "updating export %s at %ld\n",
1178 exp->exp_client_uuid.uuid,
1179 exp->exp_last_request_time);
1181 /* exports may get disconnected from the chain even though the
1182 export has references, so we must keep the spin lock while
1183 manipulating the lists */
1184 spin_lock(&exp->exp_obd->obd_dev_lock);
1186 if (list_empty(&exp->exp_obd_chain_timed)) {
1187 /* this one is not timed */
1188 spin_unlock(&exp->exp_obd->obd_dev_lock);
1193 list_move_tail(&exp->exp_obd_chain_timed,
1194 &exp->exp_obd->obd_exports_timed);
1196 oldest_exp = list_entry(exp->exp_obd->obd_exports_timed.next,
1197 struct obd_export, exp_obd_chain_timed);
1198 oldest_time = oldest_exp->exp_last_request_time;
1199 spin_unlock(&exp->exp_obd->obd_dev_lock);
1201 if (exp->exp_obd->obd_recovering) {
1202 /* be nice to everyone during recovery */
1207 /* Note - racing to start/reset the obd_eviction timer is safe */
1208 if (exp->exp_obd->obd_eviction_timer == 0) {
1209 /* Check if the oldest entry is expired. */
1210 if (CURRENT_SECONDS > (oldest_time +
1211 (3 * obd_timeout / 2) + extra_delay)) {
1212 /* We need a second timer, in case the net was down and
1213 * it just came back. Since the pinger may skip every
1214 * other PING_INTERVAL (see note in ptlrpc_pinger_main),
1215 * we better wait for 3. */
1216 exp->exp_obd->obd_eviction_timer = CURRENT_SECONDS +
1218 CDEBUG(D_HA, "%s: Think about evicting %s from %ld\n",
1219 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1223 if (CURRENT_SECONDS > (exp->exp_obd->obd_eviction_timer +
1225 /* The evictor won't evict anyone who we've heard from
1226 * recently, so we don't have to check before we start
1228 if (!ping_evictor_wake(exp))
1229 exp->exp_obd->obd_eviction_timer = 0;
1235 EXPORT_SYMBOL(class_update_export_timer);
1237 #define EVICT_BATCH 32
1238 int obd_export_evict_by_nid(struct obd_device *obd, char *nid)
1240 struct obd_export *doomed_exp[EVICT_BATCH] = { NULL };
1241 struct list_head *p;
1242 int exports_evicted = 0, num_to_evict = 0, i;
1245 spin_lock(&obd->obd_dev_lock);
1246 list_for_each(p, &obd->obd_exports) {
1247 doomed_exp[num_to_evict] = list_entry(p, struct obd_export,
1249 if (strcmp(obd_export_nid2str(doomed_exp[num_to_evict]), nid)
1251 class_export_get(doomed_exp[num_to_evict]);
1252 if (++num_to_evict == EVICT_BATCH)
1256 spin_unlock(&obd->obd_dev_lock);
1258 for (i = 0; i < num_to_evict; i++) {
1260 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1261 obd->obd_name, nid, doomed_exp[i]->exp_client_uuid.uuid,
1263 class_fail_export(doomed_exp[i]);
1264 class_export_put(doomed_exp[i]);
1266 if (num_to_evict == EVICT_BATCH) {
1271 if (!exports_evicted)
1272 CERROR("%s: can't disconnect NID '%s': no exports found\n",
1273 obd->obd_name, nid);
1274 return exports_evicted;
1276 EXPORT_SYMBOL(obd_export_evict_by_nid);
1278 int obd_export_evict_by_uuid(struct obd_device *obd, char *uuid)
1280 struct obd_export *doomed_exp = NULL;
1281 struct list_head *p;
1282 struct obd_uuid doomed;
1283 int exports_evicted = 0;
1285 obd_str2uuid(&doomed, uuid);
1287 spin_lock(&obd->obd_dev_lock);
1288 list_for_each(p, &obd->obd_exports) {
1289 doomed_exp = list_entry(p, struct obd_export, exp_obd_chain);
1291 if (obd_uuid_equals(&doomed, &doomed_exp->exp_client_uuid)) {
1292 class_export_get(doomed_exp);
1297 spin_unlock(&obd->obd_dev_lock);
1299 if (doomed_exp == NULL) {
1300 CERROR("%s: can't disconnect %s: no exports found\n",
1301 obd->obd_name, uuid);
1303 CWARN("%s: evicting %s at adminstrative request\n",
1304 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1305 class_fail_export(doomed_exp);
1306 class_export_put(doomed_exp);
1310 return exports_evicted;
1312 EXPORT_SYMBOL(obd_export_evict_by_uuid);