1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 * These are the only exported functions, they provide some generic
22 * infrastructure for managing object devices
25 #define DEBUG_SUBSYSTEM S_CLASS
27 #include <linux/kmod.h> /* for request_module() */
28 #include <linux/module.h>
29 #include <linux/obd_class.h>
30 #include <linux/lustre_mds.h>
31 #include <linux/obd_ost.h>
32 #include <linux/random.h>
33 #include <linux/slab.h>
34 #include <linux/pagemap.h>
35 #include <linux/quota.h>
37 #include <liblustre.h>
38 #include <linux/obd_class.h>
39 #include <linux/obd.h>
41 #include <linux/lprocfs_status.h>
42 #include <linux/lustre_quota.h>
44 extern struct list_head obd_types;
45 static spinlock_t obd_types_lock = SPIN_LOCK_UNLOCKED;
46 kmem_cache_t *obdo_cachep = NULL;
47 kmem_cache_t *import_cachep = NULL;
49 kmem_cache_t *qunit_cachep = NULL;
50 struct list_head qunit_hash[NR_DQHASH];
51 spinlock_t qunit_hash_lock = SPIN_LOCK_UNLOCKED;
53 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
54 void (*ptlrpc_abort_inflight_superhack)(struct obd_import *imp);
57 * support functions: we could use inter-module communication, but this
58 * is more portable to other OS's
60 static struct obd_type *class_search_type(char *name)
62 struct list_head *tmp;
63 struct obd_type *type;
65 spin_lock(&obd_types_lock);
66 list_for_each(tmp, &obd_types) {
67 type = list_entry(tmp, struct obd_type, typ_chain);
68 if (strcmp(type->typ_name, name) == 0) {
69 spin_unlock(&obd_types_lock);
73 spin_unlock(&obd_types_lock);
77 struct obd_type *class_get_type(char *name)
79 struct obd_type *type = class_search_type(name);
83 if (!request_module(name)) {
84 CDEBUG(D_INFO, "Loaded module '%s'\n", name);
85 type = class_search_type(name);
87 CDEBUG(D_INFO, "Can't load module '%s'\n", name);
91 try_module_get(type->typ_ops->o_owner);
95 void class_put_type(struct obd_type *type)
98 module_put(type->typ_ops->o_owner);
101 int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars,
104 struct obd_type *type;
108 LASSERT(strnlen(name, 1024) < 1024); /* sanity check */
110 if (class_search_type(name)) {
111 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
116 OBD_ALLOC(type, sizeof(*type));
120 OBD_ALLOC(type->typ_ops, sizeof(*type->typ_ops));
121 OBD_ALLOC(type->typ_name, strlen(name) + 1);
122 if (type->typ_ops == NULL || type->typ_name == NULL)
125 *(type->typ_ops) = *ops;
126 strcpy(type->typ_name, name);
129 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
132 if (IS_ERR(type->typ_procroot)) {
133 rc = PTR_ERR(type->typ_procroot);
134 type->typ_procroot = NULL;
138 spin_lock(&obd_types_lock);
139 list_add(&type->typ_chain, &obd_types);
140 spin_unlock(&obd_types_lock);
145 if (type->typ_name != NULL)
146 OBD_FREE(type->typ_name, strlen(name) + 1);
147 if (type->typ_ops != NULL)
148 OBD_FREE (type->typ_ops, sizeof (*type->typ_ops));
149 OBD_FREE(type, sizeof(*type));
153 int class_unregister_type(char *name)
155 struct obd_type *type = class_search_type(name);
159 CERROR("unknown obd type\n");
163 if (type->typ_refcnt) {
164 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
165 /* This is a bad situation, let's make the best of it */
166 /* Remove ops, but leave the name for debugging */
167 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
171 if (type->typ_procroot) {
172 lprocfs_remove(type->typ_procroot);
173 type->typ_procroot = NULL;
176 spin_lock(&obd_types_lock);
177 list_del(&type->typ_chain);
178 spin_unlock(&obd_types_lock);
179 OBD_FREE(type->typ_name, strlen(name) + 1);
180 if (type->typ_ops != NULL)
181 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
182 OBD_FREE(type, sizeof(*type));
184 } /* class_unregister_type */
186 struct obd_device *class_newdev(struct obd_type *type, char *name)
188 struct obd_device *result = NULL;
191 spin_lock(&obd_dev_lock);
192 for (i = 0 ; i < MAX_OBD_DEVICES; i++) {
193 struct obd_device *obd = &obd_dev[i];
194 if (obd->obd_name && (strcmp(name, obd->obd_name) == 0)) {
195 CERROR("Device %s already exists, won't add\n", name);
197 result->obd_type = NULL;
198 result->obd_name = NULL;
203 if (!result && !obd->obd_type) {
204 LASSERT(obd->obd_minor == i);
205 memset(obd, 0, sizeof(*obd));
207 obd->obd_type = type;
208 obd->obd_name = name;
209 CDEBUG(D_IOCTL, "Adding new device %s\n",
214 spin_unlock(&obd_dev_lock);
218 void class_release_dev(struct obd_device *obd)
220 int minor = obd->obd_minor;
222 spin_lock(&obd_dev_lock);
223 memset(obd, 0x5a, sizeof(*obd));
224 obd->obd_type = NULL;
225 obd->obd_minor = minor;
226 obd->obd_name = NULL;
227 spin_unlock(&obd_dev_lock);
230 int class_name2dev(char *name)
237 spin_lock(&obd_dev_lock);
238 for (i = 0; i < MAX_OBD_DEVICES; i++) {
239 struct obd_device *obd = &obd_dev[i];
240 if (obd->obd_name && strcmp(name, obd->obd_name) == 0) {
241 /* Make sure we finished attaching before we give
242 out any references */
243 if (obd->obd_attached) {
244 spin_unlock(&obd_dev_lock);
250 spin_unlock(&obd_dev_lock);
255 struct obd_device *class_name2obd(char *name)
257 int dev = class_name2dev(name);
260 return &obd_dev[dev];
263 int class_uuid2dev(struct obd_uuid *uuid)
267 spin_lock(&obd_dev_lock);
268 for (i = 0; i < MAX_OBD_DEVICES; i++) {
269 struct obd_device *obd = &obd_dev[i];
270 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
271 spin_unlock(&obd_dev_lock);
275 spin_unlock(&obd_dev_lock);
280 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
282 int dev = class_uuid2dev(uuid);
285 return &obd_dev[dev];
288 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
289 specified, then only the client with that uuid is returned,
290 otherwise any client connected to the tgt is returned. */
291 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
293 struct obd_uuid *grp_uuid)
297 spin_lock(&obd_dev_lock);
298 for (i = 0; i < MAX_OBD_DEVICES; i++) {
299 struct obd_device *obd = &obd_dev[i];
300 if (obd->obd_type == NULL)
302 if ((strncmp(obd->obd_type->typ_name, typ_name,
303 strlen(typ_name)) == 0)) {
304 struct client_obd *cli = &obd->u.cli;
305 struct obd_import *imp = cli->cl_import;
306 if (obd_uuid_equals(tgt_uuid, &imp->imp_target_uuid) &&
307 ((grp_uuid)? obd_uuid_equals(grp_uuid,
308 &obd->obd_uuid) : 1)) {
309 spin_unlock(&obd_dev_lock);
314 spin_unlock(&obd_dev_lock);
319 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
320 struct obd_uuid *grp_uuid)
322 struct obd_device *obd;
324 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
326 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
331 /* Iterate the obd_device list looking devices have grp_uuid. Start
332 searching at *next, and if a device is found, the next index to look
333 at is saved in *next. If next is NULL, then the first matching device
334 will always be returned. */
335 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
341 else if (*next >= 0 && *next < MAX_OBD_DEVICES)
346 spin_lock(&obd_dev_lock);
347 for (; i < MAX_OBD_DEVICES; i++) {
348 struct obd_device *obd = &obd_dev[i];
349 if (obd->obd_type == NULL)
351 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
354 spin_unlock(&obd_dev_lock);
358 spin_unlock(&obd_dev_lock);
363 static void obd_cleanup_qunit_cache(void)
368 spin_lock(&qunit_hash_lock);
369 for (i = 0; i < NR_DQHASH; i++)
370 LASSERT(list_empty(qunit_hash + i));
371 spin_unlock(&qunit_hash_lock);
374 LASSERTF(kmem_cache_destroy(qunit_cachep) == 0,
375 "Cannot destroy ll_qunit_cache\n");
381 void obd_cleanup_caches(void)
385 LASSERTF(kmem_cache_destroy(obdo_cachep) == 0,
386 "Cannot destory ll_obdo_cache\n");
390 LASSERTF(kmem_cache_destroy(import_cachep) == 0,
391 "Cannot destory ll_import_cache\n");
392 import_cachep = NULL;
394 obd_cleanup_qunit_cache();
398 static int obd_init_qunit_cache(void)
403 LASSERT(qunit_cachep == NULL);
404 qunit_cachep = kmem_cache_create("ll_qunit_cache",
405 sizeof(struct lustre_qunit),
410 spin_lock(&qunit_hash_lock);
411 for (i = 0; i < NR_DQHASH; i++)
412 INIT_LIST_HEAD(qunit_hash + i);
413 spin_unlock(&qunit_hash_lock);
417 int obd_init_caches(void)
422 LASSERT(obdo_cachep == NULL);
423 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
428 LASSERT(import_cachep == NULL);
429 import_cachep = kmem_cache_create("ll_import_cache",
430 sizeof(struct obd_import),
435 rc = obd_init_qunit_cache();
441 obd_cleanup_caches();
446 /* map connection to client */
447 struct obd_export *class_conn2export(struct lustre_handle *conn)
449 struct obd_export *export;
453 CDEBUG(D_CACHE, "looking for null handle\n");
457 if (conn->cookie == -1) { /* this means assign a new connection */
458 CDEBUG(D_CACHE, "want a new connection\n");
462 CDEBUG(D_IOCTL, "looking for export cookie "LPX64"\n", conn->cookie);
463 export = class_handle2object(conn->cookie);
467 struct obd_device *class_exp2obd(struct obd_export *exp)
474 struct obd_device *class_conn2obd(struct lustre_handle *conn)
476 struct obd_export *export;
477 export = class_conn2export(conn);
479 struct obd_device *obd = export->exp_obd;
480 class_export_put(export);
486 struct obd_import *class_exp2cliimp(struct obd_export *exp)
488 struct obd_device *obd = exp->exp_obd;
491 return obd->u.cli.cl_import;
494 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
496 struct obd_device *obd = class_conn2obd(conn);
499 return obd->u.cli.cl_import;
502 /* Export management functions */
503 static void export_handle_addref(void *export)
505 class_export_get(export);
508 void __class_export_put(struct obd_export *exp)
510 if (atomic_dec_and_test(&exp->exp_refcount)) {
511 struct obd_device *obd = exp->exp_obd;
512 CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp,
513 exp->exp_client_uuid.uuid);
515 LASSERT(obd != NULL);
517 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
518 if (exp->exp_connection)
519 ptlrpc_put_connection_superhack(exp->exp_connection);
521 LASSERT(list_empty(&exp->exp_outstanding_replies));
522 LASSERT(list_empty(&exp->exp_handle.h_link));
523 obd_destroy_export(exp);
525 OBD_FREE(exp, sizeof(*exp));
530 /* Creates a new export, adds it to the hash table, and returns a
531 * pointer to it. The refcount is 2: one for the hash reference, and
532 * one for the pointer returned by this function. */
533 struct obd_export *class_new_export(struct obd_device *obd)
535 struct obd_export *export;
537 OBD_ALLOC(export, sizeof(*export));
539 CERROR("no memory! (minor %d)\n", obd->obd_minor);
543 export->exp_conn_cnt = 0;
544 atomic_set(&export->exp_refcount, 2);
545 export->exp_obd = obd;
546 INIT_LIST_HEAD(&export->exp_outstanding_replies);
547 /* XXX this should be in LDLM init */
548 INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
550 INIT_LIST_HEAD(&export->exp_handle.h_link);
551 class_handle_hash(&export->exp_handle, export_handle_addref);
552 export->exp_last_request_time = CURRENT_SECONDS;
553 spin_lock_init(&export->exp_lock);
555 spin_lock(&obd->obd_dev_lock);
556 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
557 atomic_inc(&obd->obd_refcount);
558 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
559 list_add_tail(&export->exp_obd_chain_timed,
560 &export->exp_obd->obd_exports_timed);
561 export->exp_obd->obd_num_exports++;
562 spin_unlock(&obd->obd_dev_lock);
564 obd_init_export(export);
568 void class_unlink_export(struct obd_export *exp)
570 class_handle_unhash(&exp->exp_handle);
572 spin_lock(&exp->exp_obd->obd_dev_lock);
573 list_del_init(&exp->exp_obd_chain);
574 list_del_init(&exp->exp_obd_chain_timed);
575 exp->exp_obd->obd_num_exports--;
576 spin_unlock(&exp->exp_obd->obd_dev_lock);
578 class_export_put(exp);
581 /* Import management functions */
582 static void import_handle_addref(void *import)
584 class_import_get(import);
587 struct obd_import *class_import_get(struct obd_import *import)
589 LASSERT(atomic_read(&import->imp_refcount) >= 0);
590 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
591 atomic_inc(&import->imp_refcount);
592 CDEBUG(D_IOCTL, "import %p refcount=%d\n", import,
593 atomic_read(&import->imp_refcount));
597 void class_import_put(struct obd_import *import)
601 CDEBUG(D_IOCTL, "import %p refcount=%d\n", import,
602 atomic_read(&import->imp_refcount) - 1);
604 LASSERT(atomic_read(&import->imp_refcount) > 0);
605 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
606 if (!atomic_dec_and_test(&import->imp_refcount)) {
611 CDEBUG(D_IOCTL, "destroying import %p\n", import);
613 ptlrpc_put_connection_superhack(import->imp_connection);
615 while (!list_empty(&import->imp_conn_list)) {
616 struct obd_import_conn *imp_conn;
618 imp_conn = list_entry(import->imp_conn_list.next,
619 struct obd_import_conn, oic_item);
620 list_del(&imp_conn->oic_item);
621 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
622 OBD_FREE(imp_conn, sizeof(*imp_conn));
625 LASSERT(list_empty(&import->imp_handle.h_link));
626 OBD_FREE(import, sizeof(*import));
630 struct obd_import *class_new_import(void)
632 struct obd_import *imp;
634 OBD_ALLOC(imp, sizeof(*imp));
638 INIT_LIST_HEAD(&imp->imp_replay_list);
639 INIT_LIST_HEAD(&imp->imp_sending_list);
640 INIT_LIST_HEAD(&imp->imp_delayed_list);
641 spin_lock_init(&imp->imp_lock);
642 imp->imp_conn_cnt = 0;
643 imp->imp_max_transno = 0;
644 imp->imp_peer_committed_transno = 0;
645 imp->imp_state = LUSTRE_IMP_NEW;
646 init_waitqueue_head(&imp->imp_recovery_waitq);
648 atomic_set(&imp->imp_refcount, 2);
649 atomic_set(&imp->imp_inflight, 0);
650 atomic_set(&imp->imp_replay_inflight, 0);
651 INIT_LIST_HEAD(&imp->imp_conn_list);
652 INIT_LIST_HEAD(&imp->imp_handle.h_link);
653 class_handle_hash(&imp->imp_handle, import_handle_addref);
658 void class_destroy_import(struct obd_import *import)
660 LASSERT(import != NULL);
661 LASSERT(import != LP_POISON);
663 class_handle_unhash(&import->imp_handle);
665 /* Abort any inflight DLM requests and NULL out their (about to be
667 /* Invalidate all requests on import, would be better to call
668 ptlrpc_set_import_active(imp, 0); */
669 import->imp_generation++;
670 ptlrpc_abort_inflight_superhack(import);
672 class_import_put(import);
675 /* A connection defines an export context in which preallocation can
676 be managed. This releases the export pointer reference, and returns
677 the export handle, so the export refcount is 1 when this function
679 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
680 struct obd_uuid *cluuid)
682 struct obd_export *export;
683 LASSERT(conn != NULL);
684 LASSERT(obd != NULL);
685 LASSERT(cluuid != NULL);
688 export = class_new_export(obd);
692 conn->cookie = export->exp_handle.h_cookie;
693 memcpy(&export->exp_client_uuid, cluuid,
694 sizeof(export->exp_client_uuid));
695 class_export_put(export);
697 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
698 cluuid->uuid, conn->cookie);
702 /* This function removes two references from the export: one for the
703 * hash entry and one for the export pointer passed in. The export
704 * pointer passed to this function is destroyed should not be used
706 int class_disconnect(struct obd_export *export)
708 int already_disconnected;
711 if (export == NULL) {
713 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
717 spin_lock(&export->exp_lock);
718 already_disconnected = export->exp_disconnected;
719 export->exp_disconnected = 1;
720 spin_unlock(&export->exp_lock);
722 /* class_cleanup, abort_recovery, ptlrpc_fail_export, and
723 ping_evictor_fail_export all end up in here, and if any of them
724 race we shouldn't call extra class_export_puts. */
725 if (already_disconnected)
728 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
729 export->exp_handle.h_cookie);
731 class_unlink_export(export);
732 class_export_put(export);
736 static void class_disconnect_export_list(struct list_head *list, int flags)
739 struct lustre_handle fake_conn;
740 struct obd_export *fake_exp, *exp;
743 /* It's possible that an export may disconnect itself, but
744 * nothing else will be added to this list. */
745 while(!list_empty(list)) {
746 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
747 class_export_get(exp);
748 exp->exp_flags = flags;
750 if (obd_uuid_equals(&exp->exp_client_uuid,
751 &exp->exp_obd->obd_uuid)) {
753 "exp %p export uuid == obd uuid, don't discon\n",
755 /* Need to delete this now so we don't end up pointing
756 * to work_list later when this export is cleaned up. */
757 list_del_init(&exp->exp_obd_chain);
758 class_export_put(exp);
762 fake_conn.cookie = exp->exp_handle.h_cookie;
763 fake_exp = class_conn2export(&fake_conn);
765 class_export_put(exp);
768 fake_exp->exp_flags = flags;
769 rc = obd_disconnect(fake_exp);
770 class_export_put(exp);
772 CDEBUG(D_HA, "disconnecting export %p failed: %d\n",
775 CDEBUG(D_HA, "export %p disconnected\n", exp);
781 static inline int get_exp_flags_from_obd(struct obd_device *obd)
783 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
784 (obd->obd_force ? OBD_OPT_FORCE : 0));
787 void class_disconnect_exports(struct obd_device *obd)
789 struct list_head work_list;
792 /* Move all of the exports from obd_exports to a work list, en masse. */
793 spin_lock(&obd->obd_dev_lock);
794 list_add(&work_list, &obd->obd_exports);
795 list_del_init(&obd->obd_exports);
796 spin_unlock(&obd->obd_dev_lock);
798 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
799 "disconnecting them\n", obd->obd_minor, obd);
800 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
804 /* Remove exports that have not completed recovery.
806 void class_disconnect_stale_exports(struct obd_device *obd)
808 struct list_head work_list;
809 struct list_head *pos, *n;
810 struct obd_export *exp;
814 INIT_LIST_HEAD(&work_list);
815 spin_lock(&obd->obd_dev_lock);
816 list_for_each_safe(pos, n, &obd->obd_exports) {
817 exp = list_entry(pos, struct obd_export, exp_obd_chain);
818 if (exp->exp_replay_needed) {
819 list_del(&exp->exp_obd_chain);
820 list_add(&exp->exp_obd_chain, &work_list);
824 spin_unlock(&obd->obd_dev_lock);
826 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
828 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
832 int oig_init(struct obd_io_group **oig_out)
834 struct obd_io_group *oig;
837 OBD_ALLOC(oig, sizeof(*oig));
841 spin_lock_init(&oig->oig_lock);
843 oig->oig_pending = 0;
844 atomic_set(&oig->oig_refcount, 1);
845 init_waitqueue_head(&oig->oig_waitq);
846 INIT_LIST_HEAD(&oig->oig_occ_list);
852 static inline void oig_grab(struct obd_io_group *oig)
854 atomic_inc(&oig->oig_refcount);
856 void oig_release(struct obd_io_group *oig)
858 if (atomic_dec_and_test(&oig->oig_refcount))
859 OBD_FREE(oig, sizeof(*oig));
862 void oig_add_one(struct obd_io_group *oig,
863 struct oig_callback_context *occ)
866 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
867 spin_lock_irqsave(&oig->oig_lock, flags);
870 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
871 spin_unlock_irqrestore(&oig->oig_lock, flags);
875 void oig_complete_one(struct obd_io_group *oig,
876 struct oig_callback_context *occ, int rc)
879 wait_queue_head_t *wake = NULL;
882 spin_lock_irqsave(&oig->oig_lock, flags);
885 list_del_init(&occ->occ_oig_item);
887 old_rc = oig->oig_rc;
888 if (oig->oig_rc == 0 && rc != 0)
891 if (--oig->oig_pending <= 0)
892 wake = &oig->oig_waitq;
894 spin_unlock_irqrestore(&oig->oig_lock, flags);
896 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
897 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
904 static int oig_done(struct obd_io_group *oig)
908 spin_lock_irqsave(&oig->oig_lock, flags);
909 if (oig->oig_pending <= 0)
911 spin_unlock_irqrestore(&oig->oig_lock, flags);
915 static void interrupted_oig(void *data)
917 struct obd_io_group *oig = data;
918 struct oig_callback_context *occ;
921 spin_lock_irqsave(&oig->oig_lock, flags);
922 /* We need to restart the processing each time we drop the lock, as
923 * it is possible other threads called oig_complete_one() to remove
924 * an entry elsewhere in the list while we dropped lock. We need to
925 * drop the lock because osc_ap_completion() calls oig_complete_one()
926 * which re-gets this lock ;-) as well as a lock ordering issue. */
928 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
929 if (occ->interrupted)
931 occ->interrupted = 1;
932 spin_unlock_irqrestore(&oig->oig_lock, flags);
933 occ->occ_interrupted(occ);
934 spin_lock_irqsave(&oig->oig_lock, flags);
937 spin_unlock_irqrestore(&oig->oig_lock, flags);
940 int oig_wait(struct obd_io_group *oig)
942 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
945 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
948 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
949 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
950 /* we can't continue until the oig has emptied and stopped
951 * referencing state that the caller will free upon return */
953 lwi = (struct l_wait_info){ 0, };
954 } while (rc == -EINTR);
956 LASSERTF(oig->oig_pending == 0,
957 "exiting oig_wait(oig = %p) with %d pending\n", oig,
960 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
965 /* Ping evictor thread */
967 #define PET_TERMINATE 2
970 static int pet_refcount = 0;
971 static int pet_state;
972 static wait_queue_head_t pet_waitq;
973 static struct obd_export *pet_exp = NULL;
974 static spinlock_t pet_lock = SPIN_LOCK_UNLOCKED;
976 static int ping_evictor_wake(struct obd_export *exp)
979 spin_lock(&pet_lock);
981 /* eventually the new obd will call here again. */
982 spin_unlock(&pet_lock);
986 spin_unlock(&pet_lock);
988 /* We have to make sure the obd isn't destroyed between now and when
989 the ping evictor runs. We'll take a reference here, and drop it
990 when we finish in the evictor. We don't really care about this
991 export in particular; we just need one to keep the obd. */
992 class_export_get(pet_exp);
999 /* Same as ptlrpc_fail_export, but this module must load first... */
1000 void ping_evictor_fail_export(struct obd_export *exp)
1002 int rc, already_failed;
1003 unsigned long flags;
1005 spin_lock_irqsave(&exp->exp_lock, flags);
1006 already_failed = exp->exp_failed;
1007 exp->exp_failed = 1;
1008 spin_unlock_irqrestore(&exp->exp_lock, flags);
1010 if (already_failed) {
1011 CDEBUG(D_PET, "disconnecting dead export %p/%s; skipping\n",
1012 exp, exp->exp_client_uuid.uuid);
1016 CDEBUG(D_PET, "disconnecting export %p/%s\n",
1017 exp, exp->exp_client_uuid.uuid);
1019 /* Most callers into obd_disconnect are removing their own reference
1020 * (request, for example) in addition to the one from the hash table.
1021 * We don't have such a reference here, so make one. */
1022 class_export_get(exp);
1023 rc = obd_disconnect(exp);
1025 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1026 CERROR("disconnected export %p/%s\n",
1027 exp, exp->exp_client_uuid.uuid);
1030 static int ping_evictor_main(void *arg)
1032 struct list_head *pos, *n;
1033 struct obd_device *obd;
1034 struct obd_export *exp;
1035 struct l_wait_info lwi = { 0 };
1037 unsigned long flags;
1041 kportal_daemonize("ping_evictor");
1042 SIGNAL_MASK_LOCK(current, flags);
1043 sigfillset(¤t->blocked);
1045 SIGNAL_MASK_UNLOCK(current, flags);
1048 CDEBUG(D_PET, "Starting Ping Evictor\n");
1050 pet_state = PET_READY;
1052 l_wait_event(pet_waitq, pet_exp ||
1053 (pet_state == PET_TERMINATE), &lwi);
1054 if (pet_state == PET_TERMINATE)
1057 obd = pet_exp->exp_obd;
1058 expire_time = CURRENT_SECONDS - (3 * obd_timeout / 2);
1060 CDEBUG(D_PET, "evicting all exports of obd %s older than %ld\n",
1061 obd->obd_name, expire_time);
1063 /* Exports can't be deleted out of the list, which means we
1064 can't lose the last ref on the export, while we hold the obd
1065 lock (class_unlink_export). If they've already been
1066 removed from the list, we won't find them here. */
1067 spin_lock(&obd->obd_dev_lock);
1068 list_for_each_safe(pos, n, &obd->obd_exports_timed) {
1070 exp = list_entry(pos, struct obd_export,
1071 exp_obd_chain_timed);
1072 class_export_get(exp);
1073 spin_unlock(&obd->obd_dev_lock);
1075 if (expire_time > exp->exp_last_request_time) {
1076 char ipbuf[PTL_NALFMT_SIZE];
1077 struct ptlrpc_peer *peer;
1079 peer = exp->exp_connection ?
1080 &exp->exp_connection->c_peer : NULL;
1082 if (peer && peer->peer_ni) {
1083 portals_nid2str(peer->peer_ni->pni_number,
1088 LCONSOLE_WARN("%s hasn't heard from %s in %ld "
1089 "seconds. I think it's dead, "
1090 "and I am evicting it.\n",
1092 (peer && peer->peer_ni) ?
1094 (char *)exp->exp_client_uuid.uuid,
1095 (long)(CURRENT_SECONDS -
1096 exp->exp_last_request_time));
1098 ping_evictor_fail_export(exp);
1100 /* List is sorted, so everyone below is ok */
1103 class_export_put(exp);
1104 /* lock again for the next entry */
1105 spin_lock(&obd->obd_dev_lock);
1110 spin_unlock(&obd->obd_dev_lock);
1111 class_export_put(pet_exp);
1114 CDEBUG(D_PET, "Exiting Ping Evictor\n");
1120 void ping_evictor_start(void)
1125 if (++pet_refcount > 1)
1128 init_waitqueue_head(&pet_waitq);
1130 rc = kernel_thread(ping_evictor_main, NULL, CLONE_VM | CLONE_FS);
1133 CERROR("Cannot start ping evictor thread: %d\n", rc);
1138 void ping_evictor_stop(void)
1141 if (--pet_refcount > 0)
1144 pet_state = PET_TERMINATE;
1145 wake_up(&pet_waitq);
1149 /* This function makes sure dead exports are evicted in a timely manner.
1150 This function is only called when some export receives a message (i.e.,
1151 the network is up.) */
1152 void class_update_export_timer(struct obd_export *exp, time_t extra_delay)
1154 struct obd_export *oldest_exp;
1158 /* Compensate for slow machines, etc, by faking our request time
1159 into the future. Although this can break the strict time-ordering
1160 of the list, we can be really lazy here - we don't have to evict
1161 at the exact right moment. Eventually, all silent exports
1162 will make it to the top of the list. */
1163 exp->exp_last_request_time = max(exp->exp_last_request_time,
1164 (time_t)CURRENT_SECONDS + extra_delay);
1166 CDEBUG(D_PET, "updating export %s at %ld\n",
1167 exp->exp_client_uuid.uuid,
1168 exp->exp_last_request_time);
1170 /* exports may get disconnected from the chain even though the
1171 export has references, so we must keep the spin lock while
1172 manipulating the lists */
1173 spin_lock(&exp->exp_obd->obd_dev_lock);
1175 if (list_empty(&exp->exp_obd_chain_timed)) {
1176 /* this one is not timed */
1177 spin_unlock(&exp->exp_obd->obd_dev_lock);
1181 list_move_tail(&exp->exp_obd_chain_timed,
1182 &exp->exp_obd->obd_exports_timed);
1183 oldest_exp = list_entry(exp->exp_obd->obd_exports_timed.next,
1184 struct obd_export, exp_obd_chain_timed);
1185 oldest_time = oldest_exp->exp_last_request_time;
1186 spin_unlock(&exp->exp_obd->obd_dev_lock);
1188 if (exp->exp_obd->obd_recoverable_clients > 0)
1189 /* be nice to everyone during recovery */
1192 /* Note - racing to start/reset the obd_eviction timer is safe */
1193 if (exp->exp_obd->obd_eviction_timer == 0) {
1194 /* Check if the oldest entry is expired. */
1195 if (CURRENT_SECONDS > (oldest_time +
1196 (3 * obd_timeout / 2) + extra_delay)) {
1197 /* We need a second timer, in case the net was
1198 down and it just came back. Since the pinger
1199 may skip every other PING_INTERVAL (see note in
1200 ptlrpc_pinger_main), we better wait for 3. */
1201 exp->exp_obd->obd_eviction_timer = CURRENT_SECONDS +
1204 "Thinking about evicting old export from %ld\n",
1208 if (CURRENT_SECONDS > (exp->exp_obd->obd_eviction_timer +
1210 /* The evictor won't evict anyone who we've heard from
1211 recently, so we don't have to check before we start
1213 if (!ping_evictor_wake(exp))
1214 exp->exp_obd->obd_eviction_timer = 0;