Whamcloud - gitweb
- landing of b_fid after merge with b_hd_cleanup_merge.
[fs/lustre-release.git] / lustre / cmobd / cm_write.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cluster File Systems, Inc. <info@clusterfs.com>
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #define DEBUG_SUBSYSTEM S_CMOBD
23
24 #include <linux/version.h>
25 #include <linux/init.h>
26 #include <linux/obd_support.h>
27 #include <linux/lustre_lib.h>
28 #include <linux/lustre_net.h>
29 #include <linux/lustre_idl.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_cmobd.h>
33
34 #include <asm/div64.h>
35 #include <linux/pagemap.h>
36
37 #include "cm_internal.h"
38
39 extern kmem_cache_t *cmobd_extent_slab;
40
41 /* helper function to split an extent */
42 static obd_count split_extent(struct ldlm_extent *ext, unsigned long interval)
43 {
44         obd_count buf_count, remainder;
45         ENTRY;
46         
47         buf_count = ext->end - ext->start + 1;
48         LASSERT(buf_count > 0);
49         
50         remainder = do_div(buf_count, interval);
51         if (remainder)
52                 buf_count++;
53
54         RETURN(buf_count);
55 }
56
57 static int cmobd_ap_make_ready(void *data, int cmd)
58 {
59         struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
60         struct page *page = cmap->cmap_page;
61         ENTRY;
62         
63         if (cmd == OBD_BRW_READ)
64                 RETURN(0);
65         
66         if (TryLockPage(page))
67                 RETURN(-EAGAIN);
68
69         RETURN(0);
70 }
71
72 static int cmobd_ap_refresh_count(void *data, int cmd)
73 {
74         struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
75         struct page *page = cmap->cmap_page;
76         struct inode *inode = page->mapping->host;
77         ENTRY;
78
79         LASSERT(cmd != OBD_BRW_READ);
80
81         /* catch race with truncate */
82         if (((loff_t)page->index << PAGE_SHIFT) >= inode->i_size)
83                 RETURN(0);
84
85         /* catch sub-page write at end of file */
86         if (((loff_t)page->index << PAGE_SHIFT) + PAGE_SIZE > inode->i_size)
87                 RETURN(inode->i_size % PAGE_SIZE);
88
89         RETURN(PAGE_SIZE);
90 }
91
92 static void cmobd_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
93 {
94         struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
95         obd_flag valid_flags;
96         struct inode *inode;
97         ENTRY;
98
99         if (IS_ERR(cmap)) {
100                 EXIT;
101                 return;
102         }
103
104         inode = cmap->cmap_page->mapping->host;
105         oa->o_id = cmap->cmap_es->es_oa.o_id;
106         oa->o_gr = cmap->cmap_es->es_oa.o_gr;
107         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
108         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
109         if (cmd == OBD_BRW_WRITE) {
110                 oa->o_valid |= OBD_MD_FLIFID;
111                 
112                 /* FIXME-UMKA: should be here some mds num and mds id? */
113                 mdc_pack_id(obdo_id(oa), inode->i_ino, 0, 
114                             inode->i_mode, 0, 0);
115                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
116         }
117
118         obdo_from_inode(oa, inode, valid_flags);
119
120         EXIT;
121         return;
122 }
123
124 static void cmobd_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
125 {
126         struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
127         struct cmobd_extent_set *set = cmap->cmap_es;
128         unsigned long flags;
129         struct page *page;
130         int wakeup = 0;
131         ENTRY;
132
133         page = cmap->cmap_page;
134         LASSERT(PageLocked(page));
135         
136         /* XXX */
137         if (rc)
138                 SetPageError(page);
139         
140         spin_lock_irqsave(&set->es_lock, flags);
141         LASSERT(!list_empty(&set->es_pages));
142         LASSERT(!list_empty(&cmap->cmap_link));
143         
144         list_del_init(&cmap->cmap_link);
145         if (list_empty(&set->es_pages) && !set->es_count)
146                 wakeup = 1;
147         spin_unlock_irqrestore(&set->es_lock, flags);
148
149         obd_teardown_async_page(set->es_exp, set->es_lsm, NULL, 
150                                 cmap->cmap_cookie);
151         OBD_FREE(cmap, sizeof(*cmap));
152
153         unlock_page(page);
154         page_cache_release(page);
155         
156         if (wakeup)
157                 wake_up(&set->es_waitq);
158         EXIT;
159         return;
160 }
161
162 static struct obd_async_page_ops cmobd_async_page_ops = {
163         .ap_make_ready =        cmobd_ap_make_ready,
164         .ap_refresh_count =     cmobd_ap_refresh_count,
165         .ap_fill_obdo =         cmobd_ap_fill_obdo,
166         .ap_completion =        cmobd_ap_completion,
167 };
168
169 static int cmobd_send_pages(struct obd_device *obd, 
170                             struct niobuf_local *lnb,
171                             obd_count oa_bufs,
172                             struct cmobd_extent_set *set)
173 {
174         struct cm_obd *cmobd = &obd->u.cm;
175         struct obd_export *exp = cmobd->master_exp;
176         struct cmobd_async_page *cmap = NULL;
177         obd_count i;
178         int rc = 0;
179         unsigned long flags;
180         ENTRY;
181  
182         for (i = 0; i < oa_bufs; i++, lnb++) {
183                 
184                 OBD_ALLOC(cmap, sizeof(*cmap));
185                 if (cmap == NULL) {
186                         CERROR("Not enought memory\n");
187                         rc = -ENOMEM;
188                         break;
189                 }
190                 INIT_LIST_HEAD(&cmap->cmap_link);
191                 cmap->cmap_page = lnb->page;
192                 cmap->cmap_es = set;
193                         
194                 rc = obd_prep_async_page(exp, set->es_lsm, NULL, lnb->page,
195                                          lnb->offset, &cmobd_async_page_ops, 
196                                          cmap, &cmap->cmap_cookie);
197                 if (rc) {
198                         CERROR("cmobd prep async page failed page(%p) rc(%d)\n", 
199                                lnb->page, rc);
200                         OBD_FREE(cmap, sizeof(*cmap));
201                         break;
202                 }
203
204                 LASSERT(cmap->cmap_page);
205                 LASSERT(!PageLocked(cmap->cmap_page));
206                 LASSERT(Page_Uptodate(cmap->cmap_page));
207                 page_cache_get(cmap->cmap_page);
208
209                 spin_lock_irqsave(&set->es_lock, flags);
210                 list_add_tail(&cmap->cmap_link, &set->es_pages);
211                 spin_unlock_irqrestore(&set->es_lock, flags);
212                 
213                 rc = obd_queue_async_io(exp, set->es_lsm, NULL, cmap->cmap_cookie,
214                                         OBD_BRW_WRITE, 0, 0, 0, 0);
215                 if (rc) {  /* try sync io */
216                         struct obd_io_group *oig;
217                         
218                         spin_lock_irqsave(&set->es_lock, flags);
219                         list_del_init(&cmap->cmap_link);
220                         spin_unlock_irqrestore(&set->es_lock, flags);
221
222                         lock_page(cmap->cmap_page);
223                         
224                         rc = oig_init(&oig);
225                         if (rc)
226                                 GOTO(free_page, rc);
227
228                         rc = obd_queue_group_io(exp, set->es_lsm, NULL, oig,
229                                                 cmap->cmap_cookie,
230                                                 OBD_BRW_WRITE, 0, lnb->len, 0,
231                                                 ASYNC_READY | ASYNC_URGENT |
232                                                 ASYNC_COUNT_STABLE |
233                                                 ASYNC_GROUP_SYNC);
234
235                         if (rc)
236                                 GOTO(free_oig, rc);
237
238                         rc = obd_trigger_group_io(exp, set->es_lsm, NULL, oig);
239                         if (rc)
240                                 GOTO(free_oig, rc);
241
242                         rc = oig_wait(oig);
243 free_oig:
244                         oig_release(oig);
245 free_page:
246                         unlock_page(cmap->cmap_page);
247                         page_cache_release(cmap->cmap_page);
248                         obd_teardown_async_page(exp, set->es_lsm, NULL, 
249                                                 cmap->cmap_cookie);
250                         OBD_FREE(cmap, sizeof(*cmap));
251                         if (rc) {
252                                 CERROR("cmobd sync io failed\n");
253                                 break;
254                         }
255                 }
256         }
257         RETURN(rc);
258 }
259
260 static int cmobd_write_extent(struct obd_device *obd, 
261                               struct cmobd_extent_info *ei)
262 {
263         struct cmobd_extent_set *set = ei->ei_set;
264         struct cm_obd *cmobd = &obd->u.cm;
265         unsigned long flags;
266         struct obd_ioobj ioo;
267         struct niobuf_local *lnb;
268         struct niobuf_remote *rnb;
269         obd_count i, oa_bufs;
270         struct obdo *oa;
271         obd_off offset;
272         int ret, rc = 0, wakeup = 0;
273         ENTRY;
274
275         oa_bufs = split_extent(&ei->ei_extent, PAGE_SIZE);
276         LASSERT(oa_bufs > 0);
277
278         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
279         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
280         oa = obdo_alloc();
281         
282         if (lnb == NULL || rnb == NULL || oa == NULL)
283                 GOTO(out, rc = -ENOMEM);
284
285         LASSERT(ei->ei_extent.end >= ei->ei_extent.start);
286         LASSERT((ei->ei_extent.start & (PAGE_SIZE -1)) == 0);
287         
288         for (i = 0, offset = ei->ei_extent.start; i < oa_bufs; 
289              i++, offset += PAGE_SIZE) {
290                 rnb[i].offset = offset;
291                 rnb[i].len = MIN(PAGE_SIZE, ei->ei_extent.end - offset + 1);
292         }
293
294         memcpy(oa, &set->es_oa, sizeof(*oa));
295         obdo_to_ioobj(oa, &ioo);
296         ioo.ioo_bufcnt = oa_bufs;
297
298         ret = obd_preprw(OBD_BRW_READ, cmobd->cache_exp, oa, 1, &ioo, 
299                          oa_bufs, rnb, lnb, NULL);
300         if (ret)
301                 GOTO(out, rc = ret);
302
303         rc = cmobd_send_pages(obd, lnb, oa_bufs, set);
304         if (rc)
305                 CERROR("cmobd_send_pages failed %d\n", rc);
306
307         rc = obd_commitrw(OBD_BRW_READ, cmobd->cache_exp, oa, 1, &ioo,
308                           oa_bufs, lnb, NULL, ret);
309
310         /* countdown and wake up */
311         spin_lock_irqsave(&set->es_lock, flags);
312         LASSERT(set->es_count);
313         set->es_count--;
314         if (!set->es_count)
315                 wakeup = 1;
316         spin_unlock_irqrestore(&set->es_lock, flags);
317
318         if (wakeup)
319                 wake_up(&set->es_waitq);
320
321 out: 
322         if (lnb)
323                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
324         if (rnb)
325                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
326         if (oa)
327                 obdo_free(oa);
328
329         RETURN(rc);
330 }
331
332 static struct cmobd_extent_info* get_next_ei(struct cmobd_write_service *ws)
333 {
334         struct cmobd_extent_info *ei = NULL;
335         unsigned long flags;
336         int wakeup = 0;
337
338         spin_lock_irqsave(&ws->ws_extent_lock, flags);
339         if (!list_empty(&ws->ws_extents)) {
340                 ei = list_entry(ws->ws_extents.next, 
341                                 struct cmobd_extent_info, ei_link);
342                 list_del_init(&ei->ei_link);
343                 ws->ws_nextents--;
344                 if (ws->ws_nextents < CMOBD_MAX_EXTENTS)
345                         wakeup = 1;
346         }
347         spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
348
349         if (wakeup)
350                 wake_up_all(&ws->ws_waitq_provider);
351
352         return ei;
353 }
354        
355 static int cmobd_write_main(void *arg)
356 {
357         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
358         struct ptlrpc_thread   *thread = data->thread;
359         struct obd_device *obd = data->dev;
360         struct cm_obd *cmobd = &obd->u.cm;
361         struct cmobd_write_service *ws = cmobd->write_srv;
362         struct cmobd_extent_info *extent = NULL;
363         unsigned long flags;
364         int rc;
365         ENTRY;
366
367         lock_kernel();
368         /* vv ptlrpc_daemonize(); vv */
369         exit_mm(current);
370
371         current->session = 1;
372         current->pgrp = 1;
373         current->tty = NULL;
374
375         exit_files(current);
376         reparent_to_init();
377         /* ^^ ptlrpc_daemonize(); ^^ */
378
379         SIGNAL_MASK_LOCK(current, flags);
380         sigfillset(&current->blocked);
381         RECALC_SIGPENDING;
382         SIGNAL_MASK_UNLOCK(current, flags);
383
384         LASSERTF(strlen(data->name) < sizeof(current->comm),
385                  "name %d > len %d\n",strlen(data->name),sizeof(current->comm));
386         THREAD_NAME(current->comm, sizeof(current->comm) - 1, "%s", data->name);
387
388         unlock_kernel();
389
390         thread->t_flags = SVC_RUNNING;
391         wake_up(&thread->t_ctl_waitq);
392
393         /* Record that the thread is running */
394         spin_lock_irqsave(&ws->ws_thread_lock, flags);
395         ws->ws_nthreads++;
396         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
397
398         while ((thread->t_flags & SVC_STOPPING) == 0) {
399                 struct l_wait_info lwi = { 0 };
400                                   
401                 l_wait_event_exclusive(ws->ws_waitq_consumer,
402                                        ((thread->t_flags & SVC_STOPPING) ||
403                                         ((extent = get_next_ei(ws)) != 
404                                           NULL)),
405                                        &lwi);
406                 if (extent == NULL)
407                         continue;
408                 rc = cmobd_write_extent(obd, extent);
409                 if (rc)
410                         CERROR("write extent failed rc=%d\n", rc);
411                 OBD_SLAB_FREE(extent, cmobd_extent_slab, sizeof(*extent));
412                 extent = NULL;
413         }
414  
415         thread->t_flags = SVC_STOPPED;
416         wake_up(&thread->t_ctl_waitq);
417        
418         spin_lock_irqsave(&ws->ws_thread_lock, flags);
419         ws->ws_nthreads--;                    /* must know immediately */
420         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
421
422         RETURN(0);
423 }
424
425 /* functions for manipulating cmobd write replay threads, similar with 
426  * ptlrpc threads functions */
427 static int cmobd_start_thread(struct obd_device *obd, char *name)
428 {
429         struct cm_obd *cmobd = &obd->u.cm;
430         struct cmobd_write_service *ws = cmobd->write_srv;
431         struct l_wait_info lwi = { 0 };
432         struct ptlrpc_svc_data d;
433         struct ptlrpc_thread *thread;
434         unsigned long flags;
435         int rc;
436         ENTRY;
437
438         OBD_ALLOC(thread, sizeof(*thread));
439         if (thread == NULL)
440                 RETURN(-ENOMEM);
441         init_waitqueue_head(&thread->t_ctl_waitq);
442         
443         d.dev = obd;
444         d.svc = NULL;
445         d.name = name;
446         d.thread = thread;
447
448         spin_lock_irqsave(&ws->ws_thread_lock, flags);
449         list_add(&thread->t_link, &ws->ws_threads);
450         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
451
452         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
453          * just drop the VM and FILES in ptlrpc_daemonize() right away.
454          */
455         rc = kernel_thread(cmobd_write_main, &d, CLONE_VM | CLONE_FILES);
456         if (rc < 0) {
457                 CERROR("cannot start thread: %d\n", rc);
458                 spin_lock_irqsave(&ws->ws_thread_lock, flags);
459                 list_del_init(&thread->t_link);
460                 spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
461                 OBD_FREE(thread, sizeof(*thread));
462                 RETURN(rc);
463         }
464         l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
465
466         RETURN(0);
467
468 }
469
470 static void cmobd_stop_thread(struct obd_device *obd, 
471                               struct ptlrpc_thread *thread)
472 {
473         struct cm_obd *cmobd = &obd->u.cm;
474         struct cmobd_write_service *ws = cmobd->write_srv;
475         struct l_wait_info lwi = { 0 };
476         unsigned long flags;
477         ENTRY;
478
479         thread->t_flags = SVC_STOPPING;
480         wake_up_all(&ws->ws_waitq_consumer);
481
482         l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
483                      &lwi);
484
485         spin_lock_irqsave(&ws->ws_thread_lock, flags);
486         list_del(&thread->t_link);
487         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
488         
489         OBD_FREE(thread, sizeof(*thread));
490         EXIT;
491 }
492
493 static void cmobd_stop_all_threads(struct obd_device *obd)
494 {
495         struct cm_obd *cmobd = &obd->u.cm;
496         struct cmobd_write_service *ws = cmobd->write_srv;
497         unsigned long flags;
498         struct ptlrpc_thread *thread;
499         ENTRY;
500
501         spin_lock_irqsave(&ws->ws_thread_lock, flags);
502         while (!list_empty(&ws->ws_threads)) {
503                 thread = list_entry(ws->ws_threads.next, 
504                                     struct ptlrpc_thread, t_link);
505
506                 spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
507                 cmobd_stop_thread(obd, thread);
508                 spin_lock_irqsave(&ws->ws_thread_lock, flags);
509         }
510
511         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
512         EXIT;
513 }
514
515 static int cmobd_start_n_threads(struct obd_device *obd, int num_threads, 
516                                  char *base_name)
517 {
518         int i, rc = 0;
519         ENTRY;
520
521         for (i = 0; i < num_threads; i++) {
522                 char name[32];
523                 snprintf(name, sizeof(name) - 1, "%s_%02d", base_name, i);
524                 rc = cmobd_start_thread(obd, name);
525                 if (rc) {
526                         CERROR("cannot start %s thread #%d: rc %d\n", base_name,
527                                i, rc);
528                         cmobd_stop_all_threads(obd);
529                 }
530         }
531         RETURN(rc);
532 }
533
534 void cmobd_cleanup_write_srv(struct obd_device *obd)
535 {
536         struct cm_obd *cmobd = &obd->u.cm;
537         struct list_head *pos, *n;
538         struct cmobd_extent_info *ei;
539         ENTRY;
540         
541         cmobd_stop_all_threads(obd);
542         
543         list_for_each_safe(pos, n, &cmobd->write_srv->ws_extents) {
544                 ei = list_entry(pos, struct cmobd_extent_info, ei_link);
545                 list_del_init(&ei->ei_link);
546                 OBD_FREE(ei, sizeof(*ei));
547         }
548         OBD_FREE(cmobd->write_srv, sizeof(*cmobd->write_srv));
549         EXIT;
550 }
551
552 int cmobd_init_write_srv(struct obd_device *obd)
553 {
554         struct cm_obd *cmobd = &obd->u.cm;
555         struct cmobd_write_service *ws;
556         int rc;
557         ENTRY;
558
559         OBD_ALLOC(cmobd->write_srv, sizeof(*cmobd->write_srv));
560         if (cmobd->write_srv == NULL)
561                 RETURN(-ENOMEM);
562         ws = cmobd->write_srv;
563         
564         INIT_LIST_HEAD(&ws->ws_threads);
565         spin_lock_init(&ws->ws_thread_lock);
566         ws->ws_nthreads = 0;
567
568         INIT_LIST_HEAD(&ws->ws_extents);
569         spin_lock_init(&ws->ws_extent_lock);
570         ws->ws_nextents = 0;
571         init_waitqueue_head(&ws->ws_waitq_provider);
572         init_waitqueue_head(&ws->ws_waitq_consumer);
573
574         rc = cmobd_start_n_threads(obd, CMOBD_NUM_THREADS, "cm_write");
575         if (rc) 
576                 cmobd_cleanup_write_srv(obd);
577         
578         RETURN(rc);
579 }
580
581 static int extent_queue_full(struct cmobd_write_service *ws)
582 {
583         unsigned long flags;
584         int full = 0;
585         
586         spin_lock_irqsave(&ws->ws_extent_lock, flags);
587         full = (ws->ws_nextents >= CMOBD_MAX_EXTENTS) ? 1 : 0;
588         spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
589
590         return full;
591 }
592         
593 static void cmobd_queue_extent(struct obd_device *obd, 
594                                struct cmobd_extent_info *ex)
595 {
596         struct cm_obd *cmobd = &obd->u.cm;
597         struct cmobd_write_service *ws = cmobd->write_srv;
598         struct cmobd_extent_set *set = ex->ei_set;
599         unsigned long flags;
600         struct l_wait_info lwi = { 0 };
601         ENTRY;
602
603 wait:
604         l_wait_event(ws->ws_waitq_provider, !extent_queue_full(ws), &lwi);
605         
606         spin_lock_irqsave(&ws->ws_extent_lock, flags);
607         if (ws->ws_nextents >= CMOBD_MAX_EXTENTS) {
608                 spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
609                 goto wait;
610         }
611         list_add_tail(&ex->ei_link, &ws->ws_extents);
612         ws->ws_nextents++;
613         spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
614                 
615         spin_lock_irqsave(&set->es_lock, flags);
616         set->es_count++;
617         spin_unlock_irqrestore(&set->es_lock, flags);        
618
619         wake_up_all(&ws->ws_waitq_consumer);
620
621         EXIT;
622
623
624 static obd_size cmobd_id2size(struct obd_export *exp, obd_id id, obd_gr grp)
625 {
626         struct lvfs_run_ctxt saved;
627         struct dentry *de = NULL;
628         obd_size size;
629         ENTRY;
630         
631         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
632         
633         de = obd_lvfs_id2dentry(exp, id, 0, grp);
634         LASSERT(de);
635
636         size = de->d_inode->i_size;
637
638         dput(de);
639         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
640
641         RETURN(size);
642 }
643
644 static int extent_set_done(struct cmobd_extent_set *set, int phase)
645 {
646         int done = 0;
647         unsigned long flags;
648
649         spin_lock_irqsave(&set->es_lock, flags);
650         if (phase == 1)
651                 done = set->es_count ? 0 : 1;
652         else if (phase == 2) 
653                 done = (!set->es_count && list_empty(&set->es_pages)) ? 1 : 0;
654         spin_unlock_irqrestore(&set->es_lock, flags);
655
656         return done;
657 }
658
659 int cmobd_replay_write(struct obd_device *obd, struct obdo *oa, 
660                        struct ldlm_extent *ext)
661 {
662         struct cm_obd *cmobd = &obd->u.cm;
663         struct lov_obd *lov = &cmobd->master_obd->u.lov;
664         struct lov_stripe_md *lsm = NULL;
665         struct cmobd_extent_set set;
666         struct cmobd_extent_info *ex;
667         struct l_wait_info lwi = { 0 };
668         struct list_head *pos, *n;
669         struct cmobd_async_page *cmap;
670         unsigned long flags;
671         obd_count i, buf_count;
672         obd_off start;
673         int rc = 0;
674         ENTRY;
675
676         rc = cmobd_dummy_lsm(&lsm, lov->desc.ld_tgt_count, oa,
677                              (__u32)lov->desc.ld_default_stripe_size);
678         if (rc)
679                 RETURN(-ENOMEM);
680
681         set.es_extent.start = ext->start;
682         set.es_extent.end = ext->end;
683         set.es_lsm = lsm;
684         set.es_exp = cmobd->master_exp;
685         set.es_ext_sz = CMOBD_MAX_EXTENT_SZ;
686         set.es_count = 0;
687         memcpy(&set.es_oa, oa, sizeof(*oa));
688         
689         INIT_LIST_HEAD(&set.es_pages);
690         spin_lock_init(&set.es_lock);
691         init_waitqueue_head(&set.es_waitq);
692         
693         if (set.es_extent.end < set.es_extent.start) {
694                 CDEBUG(D_HA, "illegal extent in write replay\n");
695                 GOTO(out, rc = -EINVAL);
696         }
697         /* start of extent is extended to page boundaries */
698         set.es_extent.start -= set.es_extent.start & ~PAGE_MASK;
699         /* if the end of extent is EOF, set it as file size */
700         if (set.es_extent.end == OBD_OBJECT_EOF) {
701                 set.es_extent.end = cmobd_id2size(cmobd->cache_exp, 
702                                                   oa->o_id, oa->o_gr) - 1;
703                 if (set.es_extent.end <= 0)
704                         GOTO(out, rc = 0);
705         }
706         
707         buf_count = split_extent(&set.es_extent, set.es_ext_sz);
708         for (i = 0, start = set.es_extent.start; i < buf_count; 
709              i++, start += set.es_ext_sz) {
710                 OBD_SLAB_ALLOC(ex, cmobd_extent_slab, SLAB_NOFS, sizeof(*ex));
711                 if (ex == NULL) {
712                         CERROR("not enough memory\n");
713                         break;
714                 }
715
716                 INIT_LIST_HEAD(&ex->ei_link);
717                 ex->ei_set = &set;
718                 ex->ei_extent.start = start;
719                 ex->ei_extent.end = start + set.es_ext_sz - 1;
720                 if (ex->ei_extent.end > set.es_extent.end)
721                         ex->ei_extent.end = set.es_extent.end;
722
723                 cmobd_queue_extent(obd, ex);
724         }
725         
726         l_wait_event(set.es_waitq, extent_set_done(&set, 1), &lwi);
727         
728         /* fire remaining ios */
729         spin_lock_irqsave(&set.es_lock, flags);
730         list_for_each_safe (pos, n, &set.es_pages) {
731                 cmap = list_entry(pos, struct cmobd_async_page, cmap_link);
732
733                 /* locked pages are in flight */
734                 if (PageLocked(cmap->cmap_page))
735                         continue;
736                 
737                 spin_unlock_irqrestore(&set.es_lock, flags);
738                 rc = obd_set_async_flags(set.es_exp, set.es_lsm, NULL, 
739                                          cmap->cmap_cookie, 
740                                          ASYNC_URGENT);
741                 if (rc)
742                         CERROR("cmobd set async flags failed\n");
743                 spin_lock_irqsave(&set.es_lock, flags);
744                 break;
745         }
746         spin_unlock_irqrestore(&set.es_lock, flags);
747
748         l_wait_event(set.es_waitq, extent_set_done(&set, 2), &lwi);
749 out:
750         cmobd_free_lsm(&lsm);
751         RETURN(rc);
752 }