Whamcloud - gitweb
424ae98faf2635991a370d5903218ebe475c019a
[fs/lustre-release.git] / lustre / cmobd / cmobd_write.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cluster File Systems, Inc. <info@clusterfs.com>
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #define DEBUG_SUBSYSTEM S_CMOBD
23
24 #include <linux/version.h>
25 #include <linux/init.h>
26 #include <linux/obd_support.h>
27 #include <linux/lustre_lib.h>
28 #include <linux/lustre_net.h>
29 #include <linux/lustre_idl.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_cmobd.h>
33
34 #include <asm/div64.h>
35 #include <linux/pagemap.h>
36
37 #include "cmobd_internal.h"
38
39 extern kmem_cache_t *cmobd_extent_slab;
40
41 /* helper function to split an extent */
42 static obd_count split_extent(struct ldlm_extent *ext, unsigned long interval)
43 {
44         obd_count buf_count, remainder;
45         ENTRY;
46         
47         buf_count = ext->end - ext->start + 1;
48         LASSERT(buf_count > 0);
49         
50         remainder = do_div(buf_count, interval);
51         if (remainder)
52                 buf_count++;
53
54         RETURN(buf_count);
55 }
56
57 static int cmobd_ap_make_ready(void *data, int cmd)
58 {
59         struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
60         struct page *page = cmap->cmap_page;
61         ENTRY;
62         
63         if (cmd == OBD_BRW_READ)
64                 RETURN(0);
65         
66         if (TryLockPage(page))
67                 RETURN(-EAGAIN);
68
69         RETURN(0);
70 }
71
72 static int cmobd_ap_refresh_count(void *data, int cmd)
73 {
74         struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
75         struct page *page = cmap->cmap_page;
76         struct inode *inode = page->mapping->host;
77         ENTRY;
78
79         LASSERT(cmd != OBD_BRW_READ);
80
81         /* catch race with truncate */
82         if (((loff_t)page->index << PAGE_SHIFT) >= inode->i_size)
83                 RETURN(0);
84
85         /* catch sub-page write at end of file */
86         if (((loff_t)page->index << PAGE_SHIFT) + PAGE_SIZE > inode->i_size)
87                 RETURN(inode->i_size % PAGE_SIZE);
88
89         RETURN(PAGE_SIZE);
90 }
91
92 static void cmobd_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
93 {
94         struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
95         obd_flag valid_flags;
96         struct inode *inode;
97         ENTRY;
98
99         if (IS_ERR(cmap)) {
100                 EXIT;
101                 return;
102         }
103
104         inode = cmap->cmap_page->mapping->host;
105         oa->o_id = cmap->cmap_es->es_oa.o_id;
106         oa->o_gr = cmap->cmap_es->es_oa.o_gr;
107         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
108         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
109         if (cmd == OBD_BRW_WRITE) {
110                 oa->o_valid |= OBD_MD_FLIFID;
111                 mdc_pack_fid(obdo_fid(oa), inode->i_ino, 0, inode->i_mode);
112
113                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
114         }
115
116         obdo_from_inode(oa, inode, valid_flags);
117
118         EXIT;
119         return;
120 }
121
122 static void cmobd_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
123 {
124         struct cmobd_async_page *cmap = (struct cmobd_async_page *)data;
125         struct cmobd_extent_set *set = cmap->cmap_es;
126         unsigned long flags;
127         struct page *page;
128         int wakeup = 0;
129         ENTRY;
130
131         page = cmap->cmap_page;
132         LASSERT(PageLocked(page));
133         
134         /* XXX */
135         if (rc)
136                 SetPageError(page);
137         
138         spin_lock_irqsave(&set->es_lock, flags);
139         LASSERT(!list_empty(&set->es_pages));
140         LASSERT(!list_empty(&cmap->cmap_link));
141         
142         list_del_init(&cmap->cmap_link);
143         if (list_empty(&set->es_pages) && !set->es_count)
144                 wakeup = 1;
145         spin_unlock_irqrestore(&set->es_lock, flags);
146
147         obd_teardown_async_page(set->es_exp, set->es_lsm, NULL, 
148                                 cmap->cmap_cookie);
149         OBD_FREE(cmap, sizeof(*cmap));
150
151         unlock_page(page);
152         page_cache_release(page);
153         
154         if (wakeup)
155                 wake_up(&set->es_waitq);
156         EXIT;
157         return;
158 }
159
160 static struct obd_async_page_ops cmobd_async_page_ops = {
161         .ap_make_ready =        cmobd_ap_make_ready,
162         .ap_refresh_count =     cmobd_ap_refresh_count,
163         .ap_fill_obdo =         cmobd_ap_fill_obdo,
164         .ap_completion =        cmobd_ap_completion,
165 };
166
167 static int cmobd_send_pages(struct obd_device *obd, 
168                             struct niobuf_local *lnb,
169                             obd_count oa_bufs,
170                             struct cmobd_extent_set *set)
171 {
172         struct cache_manager_obd *cmobd = &obd->u.cmobd;
173         struct obd_export *exp = cmobd->cm_master_exp;
174         struct cmobd_async_page *cmap = NULL;
175         obd_count i;
176         int rc = 0;
177         unsigned long flags;
178         ENTRY;
179  
180         for (i = 0; i < oa_bufs; i++, lnb++) {
181                 
182                 OBD_ALLOC(cmap, sizeof(*cmap));
183                 if (cmap == NULL) {
184                         CERROR("Not enought memory\n");
185                         rc = -ENOMEM;
186                         break;
187                 }
188                 INIT_LIST_HEAD(&cmap->cmap_link);
189                 cmap->cmap_page = lnb->page;
190                 cmap->cmap_es = set;
191                         
192                 rc = obd_prep_async_page(exp, set->es_lsm, NULL, lnb->page,
193                                          lnb->offset, &cmobd_async_page_ops, 
194                                          cmap, &cmap->cmap_cookie);
195                 if (rc) {
196                         CERROR("cmobd prep async page failed page(%p) rc(%d)\n", 
197                                lnb->page, rc);
198                         OBD_FREE(cmap, sizeof(*cmap));
199                         break;
200                 }
201
202                 LASSERT(cmap->cmap_page);
203                 LASSERT(!PageLocked(cmap->cmap_page));
204                 LASSERT(Page_Uptodate(cmap->cmap_page));
205                 page_cache_get(cmap->cmap_page);
206
207                 spin_lock_irqsave(&set->es_lock, flags);
208                 list_add_tail(&cmap->cmap_link, &set->es_pages);
209                 spin_unlock_irqrestore(&set->es_lock, flags);
210                 
211                 rc = obd_queue_async_io(exp, set->es_lsm, NULL, cmap->cmap_cookie,
212                                         OBD_BRW_WRITE, 0, 0, 0, 0);
213                 if (rc) {  /* try sync io */
214                         struct obd_io_group *oig;
215                         
216                         spin_lock_irqsave(&set->es_lock, flags);
217                         list_del_init(&cmap->cmap_link);
218                         spin_unlock_irqrestore(&set->es_lock, flags);
219
220                         lock_page(cmap->cmap_page);
221                         
222                         rc = oig_init(&oig);
223                         if (rc)
224                                 GOTO(free_page, rc);
225
226                         rc = obd_queue_group_io(exp, set->es_lsm, NULL, oig,
227                                                 cmap->cmap_cookie,
228                                                 OBD_BRW_WRITE, 0, lnb->len, 0,
229                                                 ASYNC_READY | ASYNC_URGENT |
230                                                 ASYNC_COUNT_STABLE |
231                                                 ASYNC_GROUP_SYNC);
232
233                         if (rc)
234                                 GOTO(free_oig, rc);
235
236                         rc = obd_trigger_group_io(exp, set->es_lsm, NULL, oig);
237                         if (rc)
238                                 GOTO(free_oig, rc);
239
240                         rc = oig_wait(oig);
241 free_oig:
242                         oig_release(oig);
243 free_page:
244                         unlock_page(cmap->cmap_page);
245                         page_cache_release(cmap->cmap_page);
246                         obd_teardown_async_page(exp, set->es_lsm, NULL, 
247                                                 cmap->cmap_cookie);
248                         OBD_FREE(cmap, sizeof(*cmap));
249                         if (rc) {
250                                 CERROR("cmobd sync io failed\n");
251                                 break;
252                         }
253                 }
254         }
255         RETURN(rc);
256 }
257
258 static int cmobd_write_extent(struct obd_device *obd, 
259                               struct cmobd_extent_info *ei)
260 {
261         struct cmobd_extent_set *set = ei->ei_set;
262         struct cache_manager_obd *cmobd = &obd->u.cmobd;
263         unsigned long flags;
264         struct obd_ioobj ioo;
265         struct niobuf_local *lnb;
266         struct niobuf_remote *rnb;
267         obd_count i, oa_bufs;
268         struct obdo *oa;
269         obd_off offset;
270         int ret, rc = 0, wakeup = 0;
271         ENTRY;
272
273         oa_bufs = split_extent(&ei->ei_extent, PAGE_SIZE);
274         LASSERT(oa_bufs > 0);
275
276         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
277         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
278         oa = obdo_alloc();
279         
280         if (lnb == NULL || rnb == NULL || oa == NULL)
281                 GOTO(out, rc = -ENOMEM);
282
283         LASSERT(ei->ei_extent.end >= ei->ei_extent.start);
284         LASSERT((ei->ei_extent.start & (PAGE_SIZE -1)) == 0);
285         
286         for (i = 0, offset = ei->ei_extent.start; i < oa_bufs; 
287              i++, offset += PAGE_SIZE) {
288                 rnb[i].offset = offset;
289                 rnb[i].len = MIN(PAGE_SIZE, ei->ei_extent.end - offset + 1);
290         }
291
292         memcpy(oa, &set->es_oa, sizeof(*oa));
293         obdo_to_ioobj(oa, &ioo);
294         ioo.ioo_bufcnt = oa_bufs;
295
296         ret = obd_preprw(OBD_BRW_READ, cmobd->cm_cache_exp, oa, 1, &ioo, 
297                          oa_bufs, rnb, lnb, NULL);
298         if (ret)
299                 GOTO(out, rc = ret);
300
301         rc = cmobd_send_pages(obd, lnb, oa_bufs, set);
302         if (rc)
303                 CERROR("cmobd_send_pages failed %d\n", rc);
304
305         rc = obd_commitrw(OBD_BRW_READ, cmobd->cm_cache_exp, oa, 1, &ioo,
306                           oa_bufs, lnb, NULL, ret);
307
308         /* countdown and wake up */
309         spin_lock_irqsave(&set->es_lock, flags);
310         LASSERT(set->es_count);
311         set->es_count--;
312         if (!set->es_count)
313                 wakeup = 1;
314         spin_unlock_irqrestore(&set->es_lock, flags);
315
316         if (wakeup)
317                 wake_up(&set->es_waitq);
318
319 out: 
320         if (lnb)
321                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
322         if (rnb)
323                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
324         if (oa)
325                 obdo_free(oa);
326
327         RETURN(rc);
328 }
329
330 static struct cmobd_extent_info* get_next_ei(struct cmobd_write_service *ws)
331 {
332         struct cmobd_extent_info *ei = NULL;
333         unsigned long flags;
334         int wakeup = 0;
335
336         spin_lock_irqsave(&ws->ws_extent_lock, flags);
337         if (!list_empty(&ws->ws_extents)) {
338                 ei = list_entry(ws->ws_extents.next, 
339                                 struct cmobd_extent_info, ei_link);
340                 list_del_init(&ei->ei_link);
341                 ws->ws_nextents--;
342                 if (ws->ws_nextents < CMOBD_MAX_EXTENTS)
343                         wakeup = 1;
344         }
345         spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
346
347         if (wakeup)
348                 wake_up_all(&ws->ws_waitq_provider);
349
350         return ei;
351 }
352        
353 static int cmobd_write_main(void *arg)
354 {
355         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
356         struct ptlrpc_thread   *thread = data->thread;
357         struct obd_device *obd = data->dev;
358         struct cache_manager_obd *cmobd = &obd->u.cmobd;
359         struct cmobd_write_service *ws = cmobd->cm_write_srv;
360         struct cmobd_extent_info *extent = NULL;
361         unsigned long flags;
362         int rc;
363         ENTRY;
364
365         lock_kernel();
366         /* vv ptlrpc_daemonize(); vv */
367         exit_mm(current);
368
369         current->session = 1;
370         current->pgrp = 1;
371         current->tty = NULL;
372
373         exit_files(current);
374         reparent_to_init();
375         /* ^^ ptlrpc_daemonize(); ^^ */
376
377         SIGNAL_MASK_LOCK(current, flags);
378         sigfillset(&current->blocked);
379         RECALC_SIGPENDING;
380         SIGNAL_MASK_UNLOCK(current, flags);
381
382         LASSERTF(strlen(data->name) < sizeof(current->comm),
383                  "name %d > len %d\n",strlen(data->name),sizeof(current->comm));
384         THREAD_NAME(current->comm, sizeof(current->comm) - 1, "%s", data->name);
385
386         unlock_kernel();
387
388         thread->t_flags = SVC_RUNNING;
389         wake_up(&thread->t_ctl_waitq);
390
391         /* Record that the thread is running */
392         spin_lock_irqsave(&ws->ws_thread_lock, flags);
393         ws->ws_nthreads++;
394         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
395
396         while ((thread->t_flags & SVC_STOPPING) == 0) {
397                 struct l_wait_info lwi = { 0 };
398                                   
399                 l_wait_event_exclusive(ws->ws_waitq_consumer,
400                                        ((thread->t_flags & SVC_STOPPING) ||
401                                         ((extent = get_next_ei(ws)) != 
402                                           NULL)),
403                                        &lwi);
404                 if (extent == NULL)
405                         continue;
406                 rc = cmobd_write_extent(obd, extent);
407                 if (rc)
408                         CERROR("write extent failed rc=%d\n", rc);
409                 OBD_SLAB_FREE(extent, cmobd_extent_slab, sizeof(*extent));
410                 extent = NULL;
411         }
412  
413         thread->t_flags = SVC_STOPPED;
414         wake_up(&thread->t_ctl_waitq);
415        
416         spin_lock_irqsave(&ws->ws_thread_lock, flags);
417         ws->ws_nthreads--;                    /* must know immediately */
418         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
419
420         RETURN(0);
421 }
422
423 /* functions for manipulating cmobd write replay threads, similar with 
424  * ptlrpc threads functions */
425 static int cmobd_start_thread(struct obd_device *obd, char *name)
426 {
427         struct cache_manager_obd *cmobd = &obd->u.cmobd;
428         struct cmobd_write_service *ws = cmobd->cm_write_srv;
429         struct l_wait_info lwi = { 0 };
430         struct ptlrpc_svc_data d;
431         struct ptlrpc_thread *thread;
432         unsigned long flags;
433         int rc;
434         ENTRY;
435
436         OBD_ALLOC(thread, sizeof(*thread));
437         if (thread == NULL)
438                 RETURN(-ENOMEM);
439         init_waitqueue_head(&thread->t_ctl_waitq);
440         
441         d.dev = obd;
442         d.svc = NULL;
443         d.name = name;
444         d.thread = thread;
445
446         spin_lock_irqsave(&ws->ws_thread_lock, flags);
447         list_add(&thread->t_link, &ws->ws_threads);
448         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
449
450         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
451          * just drop the VM and FILES in ptlrpc_daemonize() right away.
452          */
453         rc = kernel_thread(cmobd_write_main, &d, CLONE_VM | CLONE_FILES);
454         if (rc < 0) {
455                 CERROR("cannot start thread: %d\n", rc);
456                 spin_lock_irqsave(&ws->ws_thread_lock, flags);
457                 list_del_init(&thread->t_link);
458                 spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
459                 OBD_FREE(thread, sizeof(*thread));
460                 RETURN(rc);
461         }
462         l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
463
464         RETURN(0);
465
466 }
467
468 static void cmobd_stop_thread(struct obd_device *obd, 
469                               struct ptlrpc_thread *thread)
470 {
471         struct cache_manager_obd *cmobd = &obd->u.cmobd;
472         struct cmobd_write_service *ws = cmobd->cm_write_srv;
473         struct l_wait_info lwi = { 0 };
474         unsigned long flags;
475         ENTRY;
476
477         thread->t_flags = SVC_STOPPING;
478         wake_up_all(&ws->ws_waitq_consumer);
479
480         l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
481                      &lwi);
482
483         spin_lock_irqsave(&ws->ws_thread_lock, flags);
484         list_del(&thread->t_link);
485         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
486         
487         OBD_FREE(thread, sizeof(*thread));
488         EXIT;
489 }
490
491 static void cmobd_stop_all_threads(struct obd_device *obd)
492 {
493         struct cache_manager_obd *cmobd = &obd->u.cmobd;
494         struct cmobd_write_service *ws = cmobd->cm_write_srv;
495         unsigned long flags;
496         struct ptlrpc_thread *thread;
497         ENTRY;
498
499         spin_lock_irqsave(&ws->ws_thread_lock, flags);
500         while (!list_empty(&ws->ws_threads)) {
501                 thread = list_entry(ws->ws_threads.next, 
502                                     struct ptlrpc_thread, t_link);
503
504                 spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
505                 cmobd_stop_thread(obd, thread);
506                 spin_lock_irqsave(&ws->ws_thread_lock, flags);
507         }
508
509         spin_unlock_irqrestore(&ws->ws_thread_lock, flags);
510         EXIT;
511 }
512
513 static int cmobd_start_n_threads(struct obd_device *obd, int num_threads, 
514                                  char *base_name)
515 {
516         int i, rc = 0;
517         ENTRY;
518
519         for (i = 0; i < num_threads; i++) {
520                 char name[32];
521                 snprintf(name, sizeof(name) - 1, "%s_%02d", base_name, i);
522                 rc = cmobd_start_thread(obd, name);
523                 if (rc) {
524                         CERROR("cannot start %s thread #%d: rc %d\n", base_name,
525                                i, rc);
526                         cmobd_stop_all_threads(obd);
527                 }
528         }
529         RETURN(rc);
530 }
531
532 void cmobd_cleanup_write_srv(struct obd_device *obd)
533 {
534         struct cache_manager_obd *cmobd = &obd->u.cmobd;
535         struct list_head *pos, *n;
536         struct cmobd_extent_info *ei;
537         ENTRY;
538         
539         cmobd_stop_all_threads(obd);
540         
541         list_for_each_safe(pos, n, &cmobd->cm_write_srv->ws_extents) {
542                 ei = list_entry(pos, struct cmobd_extent_info, ei_link);
543                 list_del_init(&ei->ei_link);
544                 OBD_FREE(ei, sizeof(*ei));
545         }
546         OBD_FREE(cmobd->cm_write_srv, sizeof(*cmobd->cm_write_srv));
547         EXIT;
548 }
549
550 int cmobd_init_write_srv(struct obd_device *obd)
551 {
552         struct cache_manager_obd *cmobd = &obd->u.cmobd;
553         struct cmobd_write_service *ws;
554         int rc;
555         ENTRY;
556
557         OBD_ALLOC(cmobd->cm_write_srv, sizeof(*cmobd->cm_write_srv));
558         if (cmobd->cm_write_srv == NULL)
559                 RETURN(-ENOMEM);
560         ws = cmobd->cm_write_srv;
561         
562         INIT_LIST_HEAD(&ws->ws_threads);
563         spin_lock_init(&ws->ws_thread_lock);
564         ws->ws_nthreads = 0;
565
566         INIT_LIST_HEAD(&ws->ws_extents);
567         spin_lock_init(&ws->ws_extent_lock);
568         ws->ws_nextents = 0;
569         init_waitqueue_head(&ws->ws_waitq_provider);
570         init_waitqueue_head(&ws->ws_waitq_consumer);
571
572         rc = cmobd_start_n_threads(obd, CMOBD_NUM_THREADS, "cm_write");
573         if (rc) 
574                 cmobd_cleanup_write_srv(obd);
575         
576         RETURN(rc);
577 }
578
579 static int extent_queue_full(struct cmobd_write_service *ws)
580 {
581         unsigned long flags;
582         int full = 0;
583         
584         spin_lock_irqsave(&ws->ws_extent_lock, flags);
585         full = (ws->ws_nextents >= CMOBD_MAX_EXTENTS) ? 1 : 0;
586         spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
587
588         return full;
589 }
590         
591 static void cmobd_queue_extent(struct obd_device *obd, 
592                                struct cmobd_extent_info *ex)
593 {
594         struct cache_manager_obd *cmobd = &obd->u.cmobd;
595         struct cmobd_write_service *ws = cmobd->cm_write_srv;
596         struct cmobd_extent_set *set = ex->ei_set;
597         unsigned long flags;
598         struct l_wait_info lwi = { 0 };
599         ENTRY;
600
601 wait:
602         l_wait_event(ws->ws_waitq_provider, !extent_queue_full(ws), &lwi);
603         
604         spin_lock_irqsave(&ws->ws_extent_lock, flags);
605         if (ws->ws_nextents >= CMOBD_MAX_EXTENTS) {
606                 spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
607                 goto wait;
608         }
609         list_add_tail(&ex->ei_link, &ws->ws_extents);
610         ws->ws_nextents++;
611         spin_unlock_irqrestore(&ws->ws_extent_lock, flags);
612                 
613         spin_lock_irqsave(&set->es_lock, flags);
614         set->es_count++;
615         spin_unlock_irqrestore(&set->es_lock, flags);        
616
617         wake_up_all(&ws->ws_waitq_consumer);
618
619         EXIT;
620
621
622 static obd_size cmobd_fid2size(struct obd_export *exp, obd_id id, obd_gr grp)
623 {
624         struct lvfs_run_ctxt saved;
625         struct dentry *de = NULL;
626         obd_size size;
627         ENTRY;
628         
629         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
630         
631         de = obd_lvfs_fid2dentry(exp, id, 0, grp);
632         LASSERT(de);
633
634         size = de->d_inode->i_size;
635
636         dput(de);
637         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
638
639         RETURN(size);
640 }
641
642 static int extent_set_done(struct cmobd_extent_set *set, int phase)
643 {
644         int done = 0;
645         unsigned long flags;
646
647         spin_lock_irqsave(&set->es_lock, flags);
648         if (phase == 1)
649                 done = set->es_count ? 0 : 1;
650         else if (phase == 2) 
651                 done = (!set->es_count && list_empty(&set->es_pages)) ? 1 : 0;
652         spin_unlock_irqrestore(&set->es_lock, flags);
653
654         return done;
655 }
656
657 int cmobd_replay_write(struct obd_device *obd, struct obdo *oa, 
658                        struct ldlm_extent *ext)
659 {
660         struct cache_manager_obd *cmobd = &obd->u.cmobd;
661         struct lov_obd *lov = &cmobd->cm_master_obd->u.lov;
662         struct lov_stripe_md *lsm = NULL;
663         struct cmobd_extent_set set;
664         struct cmobd_extent_info *ex;
665         struct l_wait_info lwi = { 0 };
666         struct list_head *pos, *n;
667         struct cmobd_async_page *cmap;
668         unsigned long flags;
669         obd_count i, buf_count;
670         obd_off start;
671         int rc = 0;
672         ENTRY;
673
674         rc = cmobd_dummy_lsm(&lsm, lov->desc.ld_tgt_count, oa,
675                              (__u32)lov->desc.ld_default_stripe_size);
676         if (rc)
677                 RETURN(-ENOMEM);
678
679         set.es_extent.start = ext->start;
680         set.es_extent.end = ext->end;
681         set.es_lsm = lsm;
682         set.es_exp = cmobd->cm_master_exp;
683         set.es_ext_sz = CMOBD_MAX_EXTENT_SZ;
684         set.es_count = 0;
685         memcpy(&set.es_oa, oa, sizeof(*oa));
686         
687         INIT_LIST_HEAD(&set.es_pages);
688         spin_lock_init(&set.es_lock);
689         init_waitqueue_head(&set.es_waitq);
690         
691         if (set.es_extent.end < set.es_extent.start) {
692                 CDEBUG(D_HA, "illegal extent in write replay\n");
693                 GOTO(out, rc = -EINVAL);
694         }
695         /* start of extent is extended to page boundaries */
696         set.es_extent.start -= set.es_extent.start & ~PAGE_MASK;
697         /* if the end of extent is EOF, set it as file size */
698         if (set.es_extent.end == OBD_OBJECT_EOF) {
699                 set.es_extent.end = cmobd_fid2size(cmobd->cm_cache_exp, 
700                                                    oa->o_id, oa->o_gr) - 1;
701                 if (set.es_extent.end <= 0)
702                         GOTO(out, rc = 0);
703         }
704         
705         buf_count = split_extent(&set.es_extent, set.es_ext_sz);
706         for (i = 0, start = set.es_extent.start; i < buf_count; 
707              i++, start += set.es_ext_sz) {
708                 OBD_SLAB_ALLOC(ex, cmobd_extent_slab, SLAB_NOFS, sizeof(*ex));
709                 if (ex == NULL) {
710                         CERROR("not enough memory\n");
711                         break;
712                 }
713
714                 INIT_LIST_HEAD(&ex->ei_link);
715                 ex->ei_set = &set;
716                 ex->ei_extent.start = start;
717                 ex->ei_extent.end = start + set.es_ext_sz - 1;
718                 if (ex->ei_extent.end > set.es_extent.end)
719                         ex->ei_extent.end = set.es_extent.end;
720
721                 cmobd_queue_extent(obd, ex);
722         }
723         
724         l_wait_event(set.es_waitq, extent_set_done(&set, 1), &lwi);
725         
726         /* fire remaining ios */
727         spin_lock_irqsave(&set.es_lock, flags);
728         list_for_each_safe (pos, n, &set.es_pages) {
729                 cmap = list_entry(pos, struct cmobd_async_page, cmap_link);
730
731                 /* locked pages are in flight */
732                 if (PageLocked(cmap->cmap_page))
733                         continue;
734                 
735                 spin_unlock_irqrestore(&set.es_lock, flags);
736                 rc = obd_set_async_flags(set.es_exp, set.es_lsm, NULL, 
737                                          cmap->cmap_cookie, 
738                                          ASYNC_URGENT);
739                 if (rc)
740                         CERROR("cmobd set async flags failed\n");
741                 spin_lock_irqsave(&set.es_lock, flags);
742                 break;
743         }
744         spin_unlock_irqrestore(&set.es_lock, flags);
745
746         l_wait_event(set.es_waitq, extent_set_done(&set, 2), &lwi);
747 out:
748         cmobd_free_lsm(&lsm);
749         RETURN(rc);
750 }