Whamcloud - gitweb
b=4738
[fs/lustre-release.git] / lustre / cmobd / cmobd_write.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cluster File Systems, Inc. <info@clusterfs.com>
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #define DEBUG_SUBSYSTEM S_CMOBD
23
24 #include <linux/version.h>
25 #include <linux/init.h>
26 #include <linux/obd_support.h>
27 #include <linux/lustre_lib.h>
28 #include <linux/lustre_net.h>
29 #include <linux/lustre_idl.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_cmobd.h>
33
34 #include <asm/div64.h>
35 #include <linux/pagemap.h>
36
37 #include "cmobd_internal.h"
38
39 extern kmem_cache_t *cmobd_extent_slab;
40
41 /* helper function to split an extent */
42 static obd_count split_extent(struct ldlm_extent *ext, unsigned long interval)
43 {
44         obd_count buf_count, remainder;
45         ENTRY;
46         
47         buf_count = ext->end - ext->start + 1;
48         LASSERT(buf_count > 0);
49         
50         remainder = do_div(buf_count, interval);
51         if (remainder)
52                 buf_count++;
53
54         RETURN(buf_count);
55 }
56
57 static int cmobd_ap_make_ready(void *data, int cmd)
58 {
59         struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
60         struct page *page = cmap->cmap_page;
61         ENTRY;
62         
63         if (cmd == OBD_BRW_READ)
64                 RETURN(0);
65         
66         if (TryLockPage(page))
67                 RETURN(-EAGAIN);
68
69         RETURN(0);
70 }
71
72 static int cmobd_ap_refresh_count(void *data, int cmd)
73 {
74         struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
75         struct page *page = cmap->cmap_page;
76         struct inode *inode = page->mapping->host;
77         ENTRY;
78
79         LASSERT(cmd != OBD_BRW_READ);
80
81         /* catch race with truncate */
82         if (((loff_t)page->index << PAGE_SHIFT) >= inode->i_size)
83                 RETURN(0);
84
85         /* catch sub-page write at end of file */
86         if (((loff_t)page->index << PAGE_SHIFT) + PAGE_SIZE > inode->i_size)
87                 RETURN(inode->i_size % PAGE_SIZE);
88
89         RETURN(PAGE_SIZE);
90 }
91
92 static void cmobd_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
93 {
94         struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
95         obd_flag valid_flags;
96         struct inode *inode;
97         ENTRY;
98
99         if (IS_ERR(cmap)) {
100                 EXIT;
101                 return;
102         }
103
104         inode = cmap->cmap_page->mapping->host;
105         oa->o_id = cmap->cmap_es->es_oa.o_id;
106         oa->o_gr = cmap->cmap_es->es_oa.o_gr;
107         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
108         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
109         if (cmd == OBD_BRW_WRITE) {
110                 oa->o_valid |= OBD_MD_FLIFID;
111                 mdc_pack_fid(obdo_fid(oa), inode->i_ino, 0, inode->i_mode);
112
113                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
114         }
115
116         obdo_from_inode(oa, inode, valid_flags);
117
118         EXIT;
119         return;
120 }
121
122 static void cmobd_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
123 {
124         struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
125         struct cmobd_extent_set *set = cmap->cmap_es;
126         unsigned long flags;
127         struct page *page;
128         int wakeup = 0;
129         ENTRY;
130
131         page = cmap->cmap_page;
132         LASSERT(PageLocked(page));
133         
134         /* XXX */
135         if (rc)
136                 SetPageError(page);
137         
138         spin_lock_irqsave(&set->es_lock, flags);
139         LASSERT(!list_empty(&set->es_pages));
140         LASSERT(!list_empty(&cmap->cmap_link));
141         
142         list_del_init(&cmap->cmap_link);
143         if (list_empty(&set->es_pages) && !set->es_count)
144                 wakeup = 1;
145         spin_unlock_irqrestore(&set->es_lock, flags);
146
147         obd_teardown_async_page(set->es_exp, set->es_lsm, NULL, 
148                                 cmap->cmap_cookie);
149         OBD_FREE(cmap, sizeof(*cmap));
150
151         unlock_page(page);
152         page_cache_release(page);
153         
154         if (wakeup)
155                 wake_up(&set->es_waitq);
156         EXIT;
157         return;
158 }
159
160 static struct obd_async_page_ops cmobd_async_page_ops = {
161         .ap_make_ready =        cmobd_ap_make_ready,
162         .ap_refresh_count =     cmobd_ap_refresh_count,
163         .ap_fill_obdo =         cmobd_ap_fill_obdo,
164         .ap_completion =        cmobd_ap_completion,
165 };
166
167 static int cmobd_send_pages(struct obd_device *obd, 
168                             struct niobuf_local *lnb,
169                             obd_count oa_bufs,
170                             struct cmobd_extent_set *set)
171 {
172         struct cache_manager_obd *cmobd = &obd->u.cmobd;
173         struct obd_export *exp = cmobd->cm_master_exp;
174         struct cmobd_async_page *cmap = NULL;
175         obd_count i;
176         int rc = 0;
177         unsigned long flags;
178         ENTRY;
179  
180         for (i = 0; i < oa_bufs; i++, lnb++) {
181                 
182                 OBD_ALLOC(cmap, sizeof(*cmap));
183                 if (cmap == NULL) {
184                         CERROR("Not enought memory\n");
185                         rc = -ENOMEM;
186                         break;
187                 }
188                 INIT_LIST_HEAD(&cmap->cmap_link);
189                 cmap->cmap_page = lnb->page;
190                 cmap->cmap_es = set;
191                         
192                 rc = obd_prep_async_page(exp, set->es_lsm, NULL, lnb->page,
193                                          lnb->offset, &cmobd_async_page_ops, 
194                                          cmap, &cmap->cmap_cookie);
195                 if (rc) {
196                         CERROR("cmobd prep async page failed page(%p) rc(%d)\n", 
197                                lnb->page, rc);
198                         OBD_FREE(cmap, sizeof(*cmap));
199                         break;
200                 }
201
202                 LASSERT(cmap->cmap_page);
203                 LASSERT(!PageLocked(cmap->cmap_page));
204                 LASSERT(Page_Uptodate(cmap->cmap_page));
205                 page_cache_get(cmap->cmap_page);
206
207                 spin_lock_irqsave(&set->es_lock, flags);
208                 list_add_tail(&cmap->cmap_link, &set->es_pages);
209                 spin_unlock_irqrestore(&set->es_lock, flags);
210                 
211                 rc = obd_queue_async_io(exp, set->es_lsm, NULL, cmap->cmap_cookie,
212                                         OBD_BRW_WRITE, 0, 0, 0, 0);
213                 if (rc) {  /* try sync io */
214                         struct obd_io_group *oig;
215                         
216                         spin_lock_irqsave(&set->es_lock, flags);
217                         list_del_init(&cmap->cmap_link);
218                         spin_unlock_irqrestore(&set->es_lock, flags);
219
220                         lock_page(cmap->cmap_page);
221                         
222                         rc = oig_init(&oig);
223                         if (rc)
224                                 GOTO(free_page, rc);
225
226                         rc = obd_queue_group_io(exp, set->es_lsm, NULL, oig,
227                                                 cmap->cmap_cookie,
228                                                 OBD_BRW_WRITE, 0, lnb->len, 0,
229                                                 ASYNC_READY | ASYNC_URGENT |
230                                                 ASYNC_COUNT_STABLE |
231                                                 ASYNC_GROUP_SYNC);
232
233                         if (rc)
234                                 GOTO(free_oig, rc);
235
236                         rc = obd_trigger_group_io(exp, set->es_lsm, NULL, oig);
237                         if (rc)
238                                 GOTO(free_oig, rc);
239
240                         rc = oig_wait(oig);
241 free_oig:
242                         oig_release(oig);
243 free_page:
244                         unlock_page(cmap->cmap_page);
245                         page_cache_release(cmap->cmap_page);
246                         obd_teardown_async_page(exp, set->es_lsm, NULL, 
247                                                 cmap->cmap_cookie);
248                         OBD_FREE(cmap, sizeof(*cmap));
249                         if (rc) {
250                                 CERROR("cmobd sync io failed\n");
251                                 break;
252                         }
253                 }
254         }
255         RETURN(rc);
256 }
257
258 static int cmobd_write_extent(struct obd_device *obd, 
259                               struct cmobd_extent_info *ei)
260 {
261         struct cmobd_extent_set *set = ei->ei_set;
262         struct cache_manager_obd *cmobd = &obd->u.cmobd;
263         unsigned long flags;
264         struct obd_ioobj ioo;
265         struct niobuf_local *lnb;
266         struct niobuf_remote *rnb;
267         obd_count i, oa_bufs;
268         struct obdo *oa;
269         obd_off offset;
270         int ret, rc = 0, wakeup = 0;
271         ENTRY;
272
273         oa_bufs = split_extent(&ei->ei_extent, PAGE_SIZE);
274         LASSERT(oa_bufs > 0);
275
276         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
277         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
278         oa = obdo_alloc();
279         
280         if (lnb == NULL || rnb == NULL || oa == NULL)
281                 GOTO(out, rc = -ENOMEM);
282
283         LASSERT(ei->ei_extent.end >= ei->ei_extent.start);
284         LASSERT((ei->ei_extent.start & (PAGE_SIZE -1)) == 0);
285         
286         for (i = 0, offset = ei->ei_extent.start; i < oa_bufs; 
287              i++, offset += PAGE_SIZE) {
288                 rnb[i].offset = offset;
289                 rnb[i].len = MIN(PAGE_SIZE, ei->ei_extent.end - offset + 1);
290         }
291
292         memcpy(oa, &set->es_oa, sizeof(*oa));
293         obdo_to_ioobj(oa, &ioo);
294         ioo.ioo_bufcnt = oa_bufs;
295
296         ret = obd_preprw(OBD_BRW_READ, cmobd->cm_cache_exp, oa, 1, &ioo, 
297                          oa_bufs, rnb, lnb, NULL);
298         if (ret)
299                 GOTO(out, rc = ret);
300
301         rc = cmobd_send_pages(obd, lnb, oa_bufs, set);
302         if (rc)
303                 CERROR("cmobd_send_pages failed %d\n", rc);
304
305         rc = obd_commitrw(OBD_BRW_READ, cmobd->cm_cache_exp, oa, 1, &ioo,
306                           oa_bufs, lnb, NULL, ret);
307
308         /* countdown and wake up */
309         spin_lock_irqsave(&set->es_lock, flags);
310         LASSERT(set->es_count);
311         set->es_count--;
312         if (!set->es_count)
313                 wakeup = 1;
314         spin_unlock_irqrestore(&set->es_lock, flags);
315
316         if (wakeup)
317                 wake_up(&set->es_waitq);
318
319 out: 
320         if (lnb)
321                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
322         if (rnb)
323                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
324         if (oa)
325                 obdo_free(oa);
326
327         RETURN(rc);
328 }
329
330 static struct cmobd_extent_info* get_next_ei(struct cmobd_write_service *ws)
331 {
332         struct cmobd_extent_info *ei = NULL;
333         unsigned long flags;
334         int wakeup = 0;
335
336         spin_lock_irqsave(&ws->ws_extent_lock, flags);
337         if (!list_empty(&ws->ws_extents)) {
338                 ei = list_entry(ws->ws_extents.next, 
339                                 struct cmobd_extent_info, ei_link);
340                 list_del_init(&ei->ei_link);
341                 ws->ws_nextents--;
342                 if (ws->ws_nextents < CMOBD_MAX_EXTENTS)
343                         wakeup = 1;
344         }
345         spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
346
347         if (wakeup)
348                 wake_up_all(&ws->ws_waitq_provider);
349
350         return ei;
351 }
352        
353 static int cmobd_write_main(void *arg)
354 {
355         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
356         struct ptlrpc_thread   *thread = data->thread;
357         struct obd_device *obd = data->dev;
358         struct cache_manager_obd *cmobd = &obd->u.cmobd;
359         struct cmobd_write_service *ws = cmobd->cm_write_srv;
360         struct cmobd_extent_info *extent = NULL;
361         unsigned long flags;
362         int rc;
363         ENTRY;
364
365         lock_kernel();
366         
367         ptlrpc_daemonize();
368
369         SIGNAL_MASK_LOCK(current, flags);
370         sigfillset(&current->blocked);
371         RECALC_SIGPENDING;
372         SIGNAL_MASK_UNLOCK(current, flags);
373
374         LASSERTF(strlen(data->name) < sizeof(current->comm),
375                  "name %d > len %d\n",strlen(data->name),sizeof(current->comm));
376         THREAD_NAME(current->comm, sizeof(current->comm) - 1, "%s", data->name);
377
378         unlock_kernel();
379
380         thread->t_flags = SVC_RUNNING;
381         wake_up(&thread->t_ctl_waitq);
382
383         /* Record that the thread is running */
384         spin_lock_irqsave(&ws->ws_thread_lock, flags);
385         ws->ws_nthreads++;
386         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
387
388         while ((thread->t_flags & SVC_STOPPING) == 0) {
389                 struct l_wait_info lwi = { 0 };
390                                   
391                 l_wait_event_exclusive(ws->ws_waitq_consumer,
392                                        ((thread->t_flags & SVC_STOPPING) ||
393                                         ((extent = get_next_ei(ws)) != 
394                                           NULL)),
395                                        &lwi);
396                 if (extent == NULL)
397                         continue;
398                 rc = cmobd_write_extent(obd, extent);
399                 if (rc)
400                         CERROR("write extent failed rc=%d\n", rc);
401                 OBD_SLAB_FREE(extent, cmobd_extent_slab, sizeof(*extent));
402                 extent = NULL;
403         }
404  
405         thread->t_flags = SVC_STOPPED;
406         wake_up(&thread->t_ctl_waitq);
407        
408         spin_lock_irqsave(&ws->ws_thread_lock, flags);
409         ws->ws_nthreads--;                    /* must know immediately */
410         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
411
412         RETURN(0);
413 }
414
415 /* functions for manipulating cmobd write replay threads, similar with 
416  * ptlrpc threads functions */
417 static int cmobd_start_thread(struct obd_device *obd, char *name)
418 {
419         struct cache_manager_obd *cmobd = &obd->u.cmobd;
420         struct cmobd_write_service *ws = cmobd->cm_write_srv;
421         struct l_wait_info lwi = { 0 };
422         struct ptlrpc_svc_data d;
423         struct ptlrpc_thread *thread;
424         unsigned long flags;
425         int rc;
426         ENTRY;
427
428         OBD_ALLOC(thread, sizeof(*thread));
429         if (thread == NULL)
430                 RETURN(-ENOMEM);
431         init_waitqueue_head(&thread->t_ctl_waitq);
432         
433         d.dev = obd;
434         d.svc = NULL;
435         d.name = name;
436         d.thread = thread;
437
438         spin_lock_irqsave(&ws->ws_thread_lock, flags);
439         list_add(&thread->t_link, &ws->ws_threads);
440         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
441
442         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
443          * just drop the VM and FILES in ptlrpc_daemonize() right away.
444          */
445         rc = kernel_thread(cmobd_write_main, &d, CLONE_VM | CLONE_FILES);
446         if (rc < 0) {
447                 CERROR("cannot start thread: %d\n", rc);
448                 spin_lock_irqsave(&ws->ws_thread_lock, flags);
449                 list_del_init(&thread->t_link);
450                 spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
451                 OBD_FREE(thread, sizeof(*thread));
452                 RETURN(rc);
453         }
454         l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
455
456         RETURN(0);
457
458 }
459
460 static void cmobd_stop_thread(struct obd_device *obd, 
461                               struct ptlrpc_thread *thread)
462 {
463         struct cache_manager_obd *cmobd = &obd->u.cmobd;
464         struct cmobd_write_service *ws = cmobd->cm_write_srv;
465         struct l_wait_info lwi = { 0 };
466         unsigned long flags;
467         ENTRY;
468
469         thread->t_flags = SVC_STOPPING;
470         wake_up_all(&ws->ws_waitq_consumer);
471
472         l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
473                      &lwi);
474
475         spin_lock_irqsave(&ws->ws_thread_lock, flags);
476         list_del(&thread->t_link);
477         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
478         
479         OBD_FREE(thread, sizeof(*thread));
480         EXIT;
481 }
482
483 static void cmobd_stop_all_threads(struct obd_device *obd)
484 {
485         struct cache_manager_obd *cmobd = &obd->u.cmobd;
486         struct cmobd_write_service *ws = cmobd->cm_write_srv;
487         unsigned long flags;
488         struct ptlrpc_thread *thread;
489         ENTRY;
490
491         spin_lock_irqsave(&ws->ws_thread_lock, flags);
492         while (!list_empty(&ws->ws_threads)) {
493                 thread = list_entry(ws->ws_threads.next, 
494                                     struct ptlrpc_thread, t_link);
495
496                 spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
497                 cmobd_stop_thread(obd, thread);
498                 spin_lock_irqsave(&ws->ws_thread_lock, flags);
499         }
500
501         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
502         EXIT;
503 }
504
505 static int cmobd_start_n_threads(struct obd_device *obd, int num_threads, 
506                                  char *base_name)
507 {
508         int i, rc = 0;
509         ENTRY;
510
511         for (i = 0; i < num_threads; i++) {
512                 char name[32];
513                 snprintf(name, sizeof(name) - 1, "%s_%02d", base_name, i);
514                 rc = cmobd_start_thread(obd, name);
515                 if (rc) {
516                         CERROR("cannot start %s thread #%d: rc %d\n", base_name,
517                                i, rc);
518                         cmobd_stop_all_threads(obd);
519                 }
520         }
521         RETURN(rc);
522 }
523
524 void cmobd_cleanup_write_srv(struct obd_device *obd)
525 {
526         struct cache_manager_obd *cmobd = &obd->u.cmobd;
527         struct list_head *pos, *n;
528         struct cmobd_extent_info *ei;
529         ENTRY;
530         
531         cmobd_stop_all_threads(obd);
532         
533         list_for_each_safe(pos, n, &cmobd->cm_write_srv->ws_extents) {
534                 ei = list_entry(pos, struct cmobd_extent_info, ei_link);
535                 list_del_init(&ei->ei_link);
536                 OBD_FREE(ei, sizeof(*ei));
537         }
538         OBD_FREE(cmobd->cm_write_srv, sizeof(*cmobd->cm_write_srv));
539         EXIT;
540 }
541
542 int cmobd_init_write_srv(struct obd_device *obd)
543 {
544         struct cache_manager_obd *cmobd = &obd->u.cmobd;
545         struct cmobd_write_service *ws;
546         int rc;
547         ENTRY;
548
549         OBD_ALLOC(cmobd->cm_write_srv, sizeof(*cmobd->cm_write_srv));
550         if (cmobd->cm_write_srv == NULL)
551                 RETURN(-ENOMEM);
552         ws = cmobd->cm_write_srv;
553         
554         INIT_LIST_HEAD(&ws->ws_threads);
555         spin_lock_init(&ws->ws_thread_lock);
556         ws->ws_nthreads = 0;
557
558         INIT_LIST_HEAD(&ws->ws_extents);
559         spin_lock_init(&ws->ws_extent_lock);
560         ws->ws_nextents = 0;
561         init_waitqueue_head(&ws->ws_waitq_provider);
562         init_waitqueue_head(&ws->ws_waitq_consumer);
563
564         rc = cmobd_start_n_threads(obd, CMOBD_NUM_THREADS, "cm_write");
565         if (rc) 
566                 cmobd_cleanup_write_srv(obd);
567         
568         RETURN(rc);
569 }
570
571 static int extent_queue_full(struct cmobd_write_service *ws)
572 {
573         unsigned long flags;
574         int full = 0;
575         
576         spin_lock_irqsave(&ws->ws_extent_lock, flags);
577         full = (ws->ws_nextents >= CMOBD_MAX_EXTENTS) ? 1 : 0;
578         spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
579
580         return full;
581 }
582         
583 static void cmobd_queue_extent(struct obd_device *obd, 
584                                struct cmobd_extent_info *ex)
585 {
586         struct cache_manager_obd *cmobd = &obd->u.cmobd;
587         struct cmobd_write_service *ws = cmobd->cm_write_srv;
588         struct cmobd_extent_set *set = ex->ei_set;
589         unsigned long flags;
590         struct l_wait_info lwi = { 0 };
591         ENTRY;
592
593 wait:
594         l_wait_event(ws->ws_waitq_provider, !extent_queue_full(ws), &lwi);
595         
596         spin_lock_irqsave(&ws->ws_extent_lock, flags);
597         if (ws->ws_nextents >= CMOBD_MAX_EXTENTS) {
598                 spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
599                 goto wait;
600         }
601         list_add_tail(&ex->ei_link, &ws->ws_extents);
602         ws->ws_nextents++;
603         spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
604                 
605         spin_lock_irqsave(&set->es_lock, flags);
606         set->es_count++;
607         spin_unlock_irqrestore(&set->es_lock, flags);        
608
609         wake_up_all(&ws->ws_waitq_consumer);
610
611         EXIT;
612
613
614 static obd_size cmobd_fid2size(struct obd_export *exp, obd_id id, obd_gr grp)
615 {
616         struct lvfs_run_ctxt saved;
617         struct dentry *de = NULL;
618         obd_size size;
619         ENTRY;
620         
621         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
622         
623         de = obd_lvfs_fid2dentry(exp, id, 0, grp);
624         LASSERT(de);
625
626         size = de->d_inode->i_size;
627
628         dput(de);
629         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
630
631         RETURN(size);
632 }
633
634 static int extent_set_done(struct cmobd_extent_set *set, int phase)
635 {
636         int done = 0;
637         unsigned long flags;
638
639         spin_lock_irqsave(&set->es_lock, flags);
640         if (phase == 1)
641                 done = set->es_count ? 0 : 1;
642         else if (phase == 2) 
643                 done = (!set->es_count && list_empty(&set->es_pages)) ? 1 : 0;
644         spin_unlock_irqrestore(&set->es_lock, flags);
645
646         return done;
647 }
648
649 int cmobd_replay_write(struct obd_device *obd, struct obdo *oa, 
650                        struct ldlm_extent *ext)
651 {
652         struct cache_manager_obd *cmobd = &obd->u.cmobd;
653         struct lov_obd *lov = &cmobd->cm_master_obd->u.lov;
654         struct lov_stripe_md *lsm = NULL;
655         struct cmobd_extent_set set;
656         struct cmobd_extent_info *ex;
657         struct l_wait_info lwi = { 0 };
658         struct list_head *pos, *n;
659         struct cmobd_async_page *cmap;
660         unsigned long flags;
661         obd_count i, buf_count;
662         obd_off start;
663         int rc = 0;
664         ENTRY;
665
666         rc = cmobd_dummy_lsm(&lsm, lov->desc.ld_tgt_count, oa,
667                              (__u32)lov->desc.ld_default_stripe_size);
668         if (rc)
669                 RETURN(-ENOMEM);
670
671         set.es_extent.start = ext->start;
672         set.es_extent.end = ext->end;
673         set.es_lsm = lsm;
674         set.es_exp = cmobd->cm_master_exp;
675         set.es_ext_sz = CMOBD_MAX_EXTENT_SZ;
676         set.es_count = 0;
677         memcpy(&set.es_oa, oa, sizeof(*oa));
678         
679         INIT_LIST_HEAD(&set.es_pages);
680         spin_lock_init(&set.es_lock);
681         init_waitqueue_head(&set.es_waitq);
682         
683         if (set.es_extent.end < set.es_extent.start) {
684                 CDEBUG(D_HA, "illegal extent in write replay\n");
685                 GOTO(out, rc = -EINVAL);
686         }
687         /* start of extent is extended to page boundaries */
688         set.es_extent.start -= set.es_extent.start & ~PAGE_MASK;
689         /* if the end of extent is EOF, set it as file size */
690         if (set.es_extent.end == OBD_OBJECT_EOF) {
691                 set.es_extent.end = cmobd_fid2size(cmobd->cm_cache_exp, 
692                                                    oa->o_id, oa->o_gr) - 1;
693                 if (set.es_extent.end <= 0)
694                         GOTO(out, rc = 0);
695         }
696         
697         buf_count = split_extent(&set.es_extent, set.es_ext_sz);
698         for (i = 0, start = set.es_extent.start; i < buf_count; 
699              i++, start += set.es_ext_sz) {
700                 OBD_SLAB_ALLOC(ex, cmobd_extent_slab, SLAB_NOFS, sizeof(*ex));
701                 if (ex == NULL) {
702                         CERROR("not enough memory\n");
703                         break;
704                 }
705
706                 INIT_LIST_HEAD(&ex->ei_link);
707                 ex->ei_set = &set;
708                 ex->ei_extent.start = start;
709                 ex->ei_extent.end = start + set.es_ext_sz - 1;
710                 if (ex->ei_extent.end > set.es_extent.end)
711                         ex->ei_extent.end = set.es_extent.end;
712
713                 cmobd_queue_extent(obd, ex);
714         }
715         
716         l_wait_event(set.es_waitq, extent_set_done(&set, 1), &lwi);
717         
718         /* fire remaining ios */
719         spin_lock_irqsave(&set.es_lock, flags);
720         list_for_each_safe (pos, n, &set.es_pages) {
721                 cmap = list_entry(pos, struct cmobd_async_page, cmap_link);
722
723                 /* locked pages are in flight */
724                 if (PageLocked(cmap->cmap_page))
725                         continue;
726                 
727                 spin_unlock_irqrestore(&set.es_lock, flags);
728                 rc = obd_set_async_flags(set.es_exp, set.es_lsm, NULL, 
729                                          cmap->cmap_cookie, 
730                                          ASYNC_URGENT);
731                 if (rc)
732                         CERROR("cmobd set async flags failed\n");
733                 spin_lock_irqsave(&set.es_lock, flags);
734                 break;
735         }
736         spin_unlock_irqrestore(&set.es_lock, flags);
737
738         l_wait_event(set.es_waitq, extent_set_done(&set, 2), &lwi);
739 out:
740         cmobd_free_lsm(&lsm);
741         RETURN(rc);
742 }