/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2001-2003 Cluster File Systems, Inc. * Author Peter Braam * * This file is part of Lustre, http://www.lustre.org. * * Lustre is free software; you can redistribute it and/or * modify it under the terms of version 2 of the GNU General Public * License as published by the Free Software Foundation. * * Lustre is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Lustre; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * * For testing and management it is treated as an obd_device, * although * it does not export a full OBD method table (the * requests are coming * in over the wire, so object target modules * do not have a full * method table.) * */ #define DEBUG_SUBSYSTEM S_OSC #ifdef __KERNEL__ # include # include # include # include # include # if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) # include # include # else # include # endif #else /* __KERNEL__ */ # include #endif #include #include #include #include #ifndef __CYGWIN__ # include # include #else # include #endif #include #include /* for OBD_FAIL_CHECK */ #include #define LIOD_STOP 0 static struct osc_rpcd_ctl { unsigned long orc_flags; spinlock_t orc_lock; struct completion orc_starting; struct completion orc_finishing; struct list_head orc_req_list; wait_queue_head_t orc_waitq; struct ptlrpc_request_set *orc_set; } osc_orc; static DECLARE_MUTEX(osc_rpcd_sem); static int osc_rpcd_users = 0; void osc_rpcd_add_req(struct ptlrpc_request *req) { struct osc_rpcd_ctl *orc = &osc_orc; ptlrpc_set_add_new_req(orc->orc_set, req); wake_up(&orc->orc_waitq); } static int osc_rpcd_check(struct osc_rpcd_ctl *orc) { struct list_head *tmp, *pos; struct ptlrpc_request *req; unsigned long flags; int rc = 0; ENTRY; if (test_bit(LIOD_STOP, &orc->orc_flags)) RETURN(1); spin_lock_irqsave(&orc->orc_set->set_new_req_lock, flags); list_for_each_safe(pos, tmp, &orc->orc_set->set_new_requests) { req = list_entry(pos, struct ptlrpc_request, rq_set_chain); list_del_init(&req->rq_set_chain); ptlrpc_set_add_req(orc->orc_set, req); rc = 1; /* need to calculate its timeout */ } spin_unlock_irqrestore(&orc->orc_set->set_new_req_lock, flags); if (orc->orc_set->set_remaining) { rc = rc | ptlrpc_check_set(orc->orc_set); /* XXX our set never completes, so we prune the completed * reqs after each iteration. boy could this be smarter. */ list_for_each_safe(pos, tmp, &orc->orc_set->set_requests) { req = list_entry(pos, struct ptlrpc_request, rq_set_chain); if (req->rq_phase != RQ_PHASE_COMPLETE) continue; list_del_init(&req->rq_set_chain); req->rq_set = NULL; ptlrpc_req_finished (req); } } RETURN(rc); } #ifdef __KERNEL__ /* ptlrpc's code paths like to execute in process context, so we have this * thread which spins on a set which contains the io rpcs. llite specifies * osc_rpcd's set when it pushes pages down into the oscs */ static int osc_rpcd(void *arg) { struct osc_rpcd_ctl *orc = arg; unsigned long flags; ENTRY; kportal_daemonize("liod_writeback"); SIGNAL_MASK_LOCK(current, flags); sigfillset(¤t->blocked); RECALC_SIGPENDING; SIGNAL_MASK_UNLOCK(current, flags); complete(&orc->orc_starting); /* like kswapd */ current->flags |= PF_MEMALLOC; /* this mainloop strongly resembles ptlrpc_set_wait except * that our set never completes. osc_rpcd_check calls ptlrpc_check_set * when there are requests in the set. new requests come in * on the set's new_req_list and osc_rpcd_check moves them into * the set. */ while (1) { wait_queue_t set_wait; struct l_wait_info lwi; int timeout; timeout = ptlrpc_set_next_timeout(orc->orc_set) * HZ; lwi = LWI_TIMEOUT(timeout, ptlrpc_expired_set, orc->orc_set); /* ala the pinger, wait on orc's waitqueue and the set's */ init_waitqueue_entry(&set_wait, current); add_wait_queue(&orc->orc_set->set_waitq, &set_wait); l_wait_event(orc->orc_waitq, osc_rpcd_check(orc), &lwi); remove_wait_queue(&orc->orc_set->set_waitq, &set_wait); if (test_bit(LIOD_STOP, &orc->orc_flags)) break; } /* XXX should be making sure we don't have anything in flight */ complete(&orc->orc_finishing); return 0; } #else static int osc_rpcd_recurred = 0; static void *osc_rpcd_callback; int osc_check_async_rpcs(void *arg) { struct osc_rpcd_ctl *orc = arg; int rc = 0; /* single threaded!! */ osc_rpcd_recurred++; if (osc_rpcd_recurred == 1) rc = osc_rpcd_check(orc); osc_rpcd_recurred--; return rc; } #endif int osc_rpcd_addref(void) { struct osc_rpcd_ctl *orc = &osc_orc; int rc = 0; ENTRY; down(&osc_rpcd_sem); if (++osc_rpcd_users != 1) GOTO(out, rc); memset(orc, 0, sizeof(*orc)); init_completion(&orc->orc_starting); init_completion(&orc->orc_finishing); init_waitqueue_head(&orc->orc_waitq); orc->orc_flags = 0; spin_lock_init(&orc->orc_lock); INIT_LIST_HEAD(&orc->orc_req_list); orc->orc_set = ptlrpc_prep_set(); if (orc->orc_set == NULL) GOTO(out, rc = -ENOMEM); #ifdef __KERNEL__ if (kernel_thread(osc_rpcd, orc, 0) < 0) { ptlrpc_set_destroy(orc->orc_set); GOTO(out, rc = -ECHILD); } wait_for_completion(&orc->orc_starting); #else osc_rpcd_callback = liblustre_register_wait_callback(&osc_check_async_rpcs, orc); #endif out: up(&osc_rpcd_sem); RETURN(rc); } void osc_rpcd_decref(void) { struct osc_rpcd_ctl *orc = &osc_orc; down(&osc_rpcd_sem); if (--osc_rpcd_users == 0) { set_bit(LIOD_STOP, &orc->orc_flags); wake_up(&orc->orc_waitq); #ifdef __KERNEL__ wait_for_completion(&orc->orc_finishing); #else liblustre_deregister_wait_callback(osc_rpcd_callback); #endif ptlrpc_set_destroy(orc->orc_set); } up(&osc_rpcd_sem); }