1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002 Cluster File Systems, Inc. <info@clusterfs.com>
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 #define DEBUG_SUBSYSTEM S_CMOBD
24 #include <linux/version.h>
25 #include <linux/init.h>
26 #include <linux/obd_support.h>
27 #include <linux/lustre_lib.h>
28 #include <linux/lustre_net.h>
29 #include <linux/lustre_idl.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_cmobd.h>
34 #include <asm/div64.h>
35 #include <linux/pagemap.h>
37 #include "cm_internal.h"
39 extern kmem_cache_t *cmobd_extent_slab;
41 /* helper function to split an extent */
42 static obd_count split_extent(struct ldlm_extent *ext, unsigned long interval)
44 obd_count buf_count, remainder;
47 buf_count = ext->end - ext->start + 1;
48 LASSERT(buf_count > 0);
50 remainder = do_div(buf_count, interval);
57 static int cmobd_ap_make_ready(void *data, int cmd)
59 struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
60 struct page *page = cmap->cmap_page;
63 if (cmd == OBD_BRW_READ)
66 if (TryLockPage(page))
72 static int cmobd_ap_refresh_count(void *data, int cmd)
74 struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
75 struct page *page = cmap->cmap_page;
76 struct inode *inode = page->mapping->host;
79 LASSERT(cmd != OBD_BRW_READ);
81 /* catch race with truncate */
82 if (((loff_t)page->index << PAGE_SHIFT) >= inode->i_size)
85 /* catch sub-page write at end of file */
86 if (((loff_t)page->index << PAGE_SHIFT) + PAGE_SIZE > inode->i_size)
87 RETURN(inode->i_size % PAGE_SIZE);
92 static void cmobd_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
94 struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
95 obd_valid valid_flags;
104 inode = cmap->cmap_page->mapping->host;
105 oa->o_id = cmap->cmap_es->es_oa.o_id;
106 oa->o_gr = cmap->cmap_es->es_oa.o_gr;
107 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
108 valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
109 if (cmd == OBD_BRW_WRITE) {
110 oa->o_valid |= OBD_MD_FLIFID;
112 /* FIXME-UMKA: should be here some mds num and mds id? */
113 mdc_pack_id(obdo_id(oa), inode->i_ino, 0,
114 inode->i_mode, 0, 0);
115 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
118 obdo_from_inode(oa, inode, valid_flags);
124 static void cmobd_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
126 struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
127 struct cmobd_extent_set *set = cmap->cmap_es;
133 page = cmap->cmap_page;
134 LASSERT(PageLocked(page));
140 spin_lock_irqsave(&set->es_lock, flags);
141 LASSERT(!list_empty(&set->es_pages));
142 LASSERT(!list_empty(&cmap->cmap_link));
144 list_del_init(&cmap->cmap_link);
145 if (list_empty(&set->es_pages) && !set->es_count)
147 spin_unlock_irqrestore(&set->es_lock, flags);
149 obd_teardown_async_page(set->es_exp, set->es_lsm, NULL,
151 OBD_FREE(cmap, sizeof(*cmap));
154 page_cache_release(page);
157 wake_up(&set->es_waitq);
162 static struct obd_async_page_ops cmobd_async_page_ops = {
163 .ap_make_ready = cmobd_ap_make_ready,
164 .ap_refresh_count = cmobd_ap_refresh_count,
165 .ap_fill_obdo = cmobd_ap_fill_obdo,
166 .ap_completion = cmobd_ap_completion,
169 static int cmobd_send_pages(struct obd_device *obd,
170 struct niobuf_local *lnb,
172 struct cmobd_extent_set *set)
174 struct cm_obd *cmobd = &obd->u.cm;
175 struct obd_export *exp = cmobd->master_exp;
176 struct cmobd_async_page *cmap = NULL;
182 for (i = 0; i < oa_bufs; i++, lnb++) {
184 OBD_ALLOC(cmap, sizeof(*cmap));
186 CERROR("Not enought memory\n");
190 INIT_LIST_HEAD(&cmap->cmap_link);
191 cmap->cmap_page = lnb->page;
194 rc = obd_prep_async_page(exp, set->es_lsm, NULL, lnb->page,
195 lnb->offset, &cmobd_async_page_ops,
196 cmap, &cmap->cmap_cookie);
198 CERROR("cmobd prep async page failed page(%p) rc(%d)\n",
200 OBD_FREE(cmap, sizeof(*cmap));
204 LASSERT(cmap->cmap_page);
205 LASSERT(!PageLocked(cmap->cmap_page));
206 LASSERT(Page_Uptodate(cmap->cmap_page));
207 page_cache_get(cmap->cmap_page);
209 spin_lock_irqsave(&set->es_lock, flags);
210 list_add_tail(&cmap->cmap_link, &set->es_pages);
211 spin_unlock_irqrestore(&set->es_lock, flags);
213 rc = obd_queue_async_io(exp, set->es_lsm, NULL, cmap->cmap_cookie,
214 OBD_BRW_WRITE, 0, 0, 0, 0);
215 if (rc) { /* try sync io */
216 struct obd_io_group *oig;
218 spin_lock_irqsave(&set->es_lock, flags);
219 list_del_init(&cmap->cmap_link);
220 spin_unlock_irqrestore(&set->es_lock, flags);
222 lock_page(cmap->cmap_page);
228 rc = obd_queue_group_io(exp, set->es_lsm, NULL, oig,
230 OBD_BRW_WRITE, 0, lnb->len, 0,
231 ASYNC_READY | ASYNC_URGENT |
238 rc = obd_trigger_group_io(exp, set->es_lsm, NULL, oig);
246 unlock_page(cmap->cmap_page);
247 page_cache_release(cmap->cmap_page);
248 obd_teardown_async_page(exp, set->es_lsm, NULL,
250 OBD_FREE(cmap, sizeof(*cmap));
252 CERROR("cmobd sync io failed\n");
260 static int cmobd_write_extent(struct obd_device *obd,
261 struct cmobd_extent_info *ei)
263 struct cmobd_extent_set *set = ei->ei_set;
264 struct cm_obd *cmobd = &obd->u.cm;
266 struct obd_ioobj ioo;
267 struct niobuf_local *lnb;
268 struct niobuf_remote *rnb;
269 obd_count i, oa_bufs;
272 int ret, rc = 0, wakeup = 0;
275 oa_bufs = split_extent(&ei->ei_extent, PAGE_SIZE);
276 LASSERT(oa_bufs > 0);
278 OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
279 OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
282 if (lnb == NULL || rnb == NULL || oa == NULL)
283 GOTO(out, rc = -ENOMEM);
285 LASSERT(ei->ei_extent.end >= ei->ei_extent.start);
286 LASSERT((ei->ei_extent.start & (PAGE_SIZE -1)) == 0);
288 for (i = 0, offset = ei->ei_extent.start; i < oa_bufs;
289 i++, offset += PAGE_SIZE) {
290 rnb[i].offset = offset;
291 rnb[i].len = MIN(PAGE_SIZE, ei->ei_extent.end - offset + 1);
294 memcpy(oa, &set->es_oa, sizeof(*oa));
295 obdo_to_ioobj(oa, &ioo);
296 ioo.ioo_bufcnt = oa_bufs;
298 ret = obd_preprw(OBD_BRW_READ, cmobd->cache_exp, oa, 1, &ioo,
299 oa_bufs, rnb, lnb, NULL);
303 rc = cmobd_send_pages(obd, lnb, oa_bufs, set);
305 CERROR("cmobd_send_pages failed %d\n", rc);
307 rc = obd_commitrw(OBD_BRW_READ, cmobd->cache_exp, oa, 1, &ioo,
308 oa_bufs, lnb, NULL, ret);
310 /* countdown and wake up */
311 spin_lock_irqsave(&set->es_lock, flags);
312 LASSERT(set->es_count);
316 spin_unlock_irqrestore(&set->es_lock, flags);
319 wake_up(&set->es_waitq);
323 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
325 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
332 static struct cmobd_extent_info* get_next_ei(struct cmobd_write_service *ws)
334 struct cmobd_extent_info *ei = NULL;
338 spin_lock_irqsave(&ws->ws_extent_lock, flags);
339 if (!list_empty(&ws->ws_extents)) {
340 ei = list_entry(ws->ws_extents.next,
341 struct cmobd_extent_info, ei_link);
342 list_del_init(&ei->ei_link);
344 if (ws->ws_nextents < CMOBD_MAX_EXTENTS)
347 spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
350 wake_up_all(&ws->ws_waitq_provider);
355 static int cmobd_write_main(void *arg)
357 struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
358 struct ptlrpc_thread *thread = data->thread;
359 struct obd_device *obd = data->dev;
360 struct cm_obd *cmobd = &obd->u.cm;
361 struct cmobd_write_service *ws = cmobd->write_srv;
362 struct cmobd_extent_info *extent = NULL;
370 SIGNAL_MASK_LOCK(current, flags);
371 sigfillset(¤t->blocked);
373 SIGNAL_MASK_UNLOCK(current, flags);
375 LASSERTF(strlen(data->name) < sizeof(current->comm),
376 "name %d > len %d\n",strlen(data->name),sizeof(current->comm));
377 THREAD_NAME(current->comm, sizeof(current->comm) - 1, "%s", data->name);
381 thread->t_flags = SVC_RUNNING;
382 wake_up(&thread->t_ctl_waitq);
384 /* Record that the thread is running */
385 spin_lock_irqsave(&ws->ws_thread_lock, flags);
387 spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
389 while ((thread->t_flags & SVC_STOPPING) == 0) {
390 struct l_wait_info lwi = { 0 };
392 l_wait_event_exclusive(ws->ws_waitq_consumer,
393 ((thread->t_flags & SVC_STOPPING) ||
394 ((extent = get_next_ei(ws)) !=
399 rc = cmobd_write_extent(obd, extent);
401 CERROR("write extent failed rc=%d\n", rc);
402 OBD_SLAB_FREE(extent, cmobd_extent_slab, sizeof(*extent));
406 thread->t_flags = SVC_STOPPED;
407 wake_up(&thread->t_ctl_waitq);
409 spin_lock_irqsave(&ws->ws_thread_lock, flags);
410 ws->ws_nthreads--; /* must know immediately */
411 spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
416 /* functions for manipulating cmobd write replay threads, similar with
417 * ptlrpc threads functions */
418 static int cmobd_start_thread(struct obd_device *obd, char *name)
420 struct cm_obd *cmobd = &obd->u.cm;
421 struct cmobd_write_service *ws = cmobd->write_srv;
422 struct l_wait_info lwi = { 0 };
423 struct ptlrpc_svc_data d;
424 struct ptlrpc_thread *thread;
429 OBD_ALLOC(thread, sizeof(*thread));
432 init_waitqueue_head(&thread->t_ctl_waitq);
439 spin_lock_irqsave(&ws->ws_thread_lock, flags);
440 list_add(&thread->t_link, &ws->ws_threads);
441 spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
443 /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
444 * just drop the VM and FILES in ptlrpc_daemonize() right away.
446 rc = kernel_thread(cmobd_write_main, &d, CLONE_VM | CLONE_FILES);
448 CERROR("cannot start thread: %d\n", rc);
449 spin_lock_irqsave(&ws->ws_thread_lock, flags);
450 list_del_init(&thread->t_link);
451 spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
452 OBD_FREE(thread, sizeof(*thread));
455 l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
461 static void cmobd_stop_thread(struct obd_device *obd,
462 struct ptlrpc_thread *thread)
464 struct cm_obd *cmobd = &obd->u.cm;
465 struct cmobd_write_service *ws = cmobd->write_srv;
466 struct l_wait_info lwi = { 0 };
470 thread->t_flags = SVC_STOPPING;
471 wake_up_all(&ws->ws_waitq_consumer);
473 l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
476 spin_lock_irqsave(&ws->ws_thread_lock, flags);
477 list_del(&thread->t_link);
478 spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
480 OBD_FREE(thread, sizeof(*thread));
484 static void cmobd_stop_all_threads(struct obd_device *obd)
486 struct cm_obd *cmobd = &obd->u.cm;
487 struct cmobd_write_service *ws = cmobd->write_srv;
489 struct ptlrpc_thread *thread;
492 spin_lock_irqsave(&ws->ws_thread_lock, flags);
493 while (!list_empty(&ws->ws_threads)) {
494 thread = list_entry(ws->ws_threads.next,
495 struct ptlrpc_thread, t_link);
497 spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
498 cmobd_stop_thread(obd, thread);
499 spin_lock_irqsave(&ws->ws_thread_lock, flags);
502 spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
506 static int cmobd_start_n_threads(struct obd_device *obd, int num_threads,
512 for (i = 0; i < num_threads; i++) {
514 snprintf(name, sizeof(name) - 1, "%s_%02d", base_name, i);
515 rc = cmobd_start_thread(obd, name);
517 CERROR("cannot start %s thread #%d: rc %d\n", base_name,
519 cmobd_stop_all_threads(obd);
525 void cmobd_cleanup_write_srv(struct obd_device *obd)
527 struct cm_obd *cmobd = &obd->u.cm;
528 struct list_head *pos, *n;
529 struct cmobd_extent_info *ei;
532 cmobd_stop_all_threads(obd);
534 list_for_each_safe(pos, n, &cmobd->write_srv->ws_extents) {
535 ei = list_entry(pos, struct cmobd_extent_info, ei_link);
536 list_del_init(&ei->ei_link);
537 OBD_FREE(ei, sizeof(*ei));
539 OBD_FREE(cmobd->write_srv, sizeof(*cmobd->write_srv));
543 int cmobd_init_write_srv(struct obd_device *obd)
545 struct cm_obd *cmobd = &obd->u.cm;
546 struct cmobd_write_service *ws;
550 OBD_ALLOC(cmobd->write_srv, sizeof(*cmobd->write_srv));
551 if (cmobd->write_srv == NULL)
553 ws = cmobd->write_srv;
555 INIT_LIST_HEAD(&ws->ws_threads);
556 spin_lock_init(&ws->ws_thread_lock);
559 INIT_LIST_HEAD(&ws->ws_extents);
560 spin_lock_init(&ws->ws_extent_lock);
562 init_waitqueue_head(&ws->ws_waitq_provider);
563 init_waitqueue_head(&ws->ws_waitq_consumer);
565 rc = cmobd_start_n_threads(obd, CMOBD_NUM_THREADS, "cm_write");
567 cmobd_cleanup_write_srv(obd);
572 static int extent_queue_full(struct cmobd_write_service *ws)
577 spin_lock_irqsave(&ws->ws_extent_lock, flags);
578 full = (ws->ws_nextents >= CMOBD_MAX_EXTENTS) ? 1 : 0;
579 spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
584 static void cmobd_queue_extent(struct obd_device *obd,
585 struct cmobd_extent_info *ex)
587 struct cm_obd *cmobd = &obd->u.cm;
588 struct cmobd_write_service *ws = cmobd->write_srv;
589 struct cmobd_extent_set *set = ex->ei_set;
591 struct l_wait_info lwi = { 0 };
595 l_wait_event(ws->ws_waitq_provider, !extent_queue_full(ws), &lwi);
597 spin_lock_irqsave(&ws->ws_extent_lock, flags);
598 if (ws->ws_nextents >= CMOBD_MAX_EXTENTS) {
599 spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
602 list_add_tail(&ex->ei_link, &ws->ws_extents);
604 spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
606 spin_lock_irqsave(&set->es_lock, flags);
608 spin_unlock_irqrestore(&set->es_lock, flags);
610 wake_up_all(&ws->ws_waitq_consumer);
615 static obd_size cmobd_id2size(struct obd_export *exp, obd_id id, obd_gr grp)
617 struct lvfs_run_ctxt saved;
618 struct dentry *de = NULL;
622 push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
624 de = obd_lvfs_id2dentry(exp, id, 0, grp);
627 size = de->d_inode->i_size;
630 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
635 static int extent_set_done(struct cmobd_extent_set *set, int phase)
640 spin_lock_irqsave(&set->es_lock, flags);
642 done = set->es_count ? 0 : 1;
644 done = (!set->es_count && list_empty(&set->es_pages)) ? 1 : 0;
645 spin_unlock_irqrestore(&set->es_lock, flags);
650 int cmobd_replay_write(struct obd_device *obd, struct obdo *oa,
651 struct ldlm_extent *ext)
653 struct cm_obd *cmobd = &obd->u.cm;
654 struct lov_stripe_md *lsm = NULL;
655 struct cmobd_extent_set set;
656 struct cmobd_extent_info *ex;
657 struct l_wait_info lwi = { 0 };
658 struct list_head *pos, *n;
659 struct cmobd_async_page *cmap;
662 obd_count i, buf_count;
668 * nevertheless ost is not used anymore and lov should be always present
669 * as a object storage export, using ost is still possible (just
670 * deprecated) and we should make sure here, that this is really
673 lov = &cmobd->master_exp->exp_obd->u.lov;
674 rc = cmobd_dummy_lsm(&lsm, lov->desc.ld_tgt_count, oa,
675 (__u32)lov->desc.ld_default_stripe_size);
679 set.es_extent.start = ext->start;
680 set.es_extent.end = ext->end;
682 set.es_exp = cmobd->master_exp;
683 set.es_ext_sz = CMOBD_MAX_EXTENT_SZ;
685 memcpy(&set.es_oa, oa, sizeof(*oa));
687 INIT_LIST_HEAD(&set.es_pages);
688 spin_lock_init(&set.es_lock);
689 init_waitqueue_head(&set.es_waitq);
691 if (set.es_extent.end < set.es_extent.start) {
692 CDEBUG(D_HA, "illegal extent in write replay\n");
693 GOTO(out, rc = -EINVAL);
695 /* start of extent is extended to page boundaries */
696 set.es_extent.start -= set.es_extent.start & ~PAGE_MASK;
697 /* if the end of extent is EOF, set it as file size */
698 if (set.es_extent.end == OBD_OBJECT_EOF) {
699 set.es_extent.end = cmobd_id2size(cmobd->cache_exp,
700 oa->o_id, oa->o_gr) - 1;
701 if (set.es_extent.end <= 0)
705 buf_count = split_extent(&set.es_extent, set.es_ext_sz);
706 for (i = 0, start = set.es_extent.start; i < buf_count;
707 i++, start += set.es_ext_sz) {
708 OBD_SLAB_ALLOC(ex, cmobd_extent_slab, SLAB_NOFS, sizeof(*ex));
710 CERROR("not enough memory\n");
714 INIT_LIST_HEAD(&ex->ei_link);
716 ex->ei_extent.start = start;
717 ex->ei_extent.end = start + set.es_ext_sz - 1;
718 if (ex->ei_extent.end > set.es_extent.end)
719 ex->ei_extent.end = set.es_extent.end;
721 cmobd_queue_extent(obd, ex);
724 l_wait_event(set.es_waitq, extent_set_done(&set, 1), &lwi);
726 /* fire remaining ios */
727 spin_lock_irqsave(&set.es_lock, flags);
728 list_for_each_safe (pos, n, &set.es_pages) {
729 cmap = list_entry(pos, struct cmobd_async_page, cmap_link);
731 /* locked pages are in flight */
732 if (PageLocked(cmap->cmap_page))
735 spin_unlock_irqrestore(&set.es_lock, flags);
736 rc = obd_set_async_flags(set.es_exp, set.es_lsm, NULL,
740 CERROR("cmobd set async flags failed\n");
741 spin_lock_irqsave(&set.es_lock, flags);
744 spin_unlock_irqrestore(&set.es_lock, flags);
746 l_wait_event(set.es_waitq, extent_set_done(&set, 2), &lwi);
748 cmobd_free_lsm(&lsm);