Whamcloud - gitweb
- landed b_hd_cray_merge3
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LOV
26
27 #ifdef __KERNEL__
28 #include <asm/div64.h>
29 #else
30 #include <liblustre.h>
31 #endif
32
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_idl.h>
36
37 #include "lov_internal.h"
38
39 static void lov_init_set(struct lov_request_set *set)
40 {
41         set->set_count = 0;
42         set->set_completes = 0;
43         set->set_success = 0;
44         INIT_LIST_HEAD(&set->set_list);
45         atomic_set(&set->set_refcount, 1);
46 }
47
48 static void lov_finish_set(struct lov_request_set *set)
49 {
50         struct list_head *pos, *n;
51         ENTRY;
52
53         LASSERT(set);
54         list_for_each_safe(pos, n, &set->set_list) {
55                 struct lov_request *req = list_entry(pos, struct lov_request,
56                                                      rq_link);
57                 list_del_init(&req->rq_link);
58
59                 if (req->rq_oa)
60                         obdo_free(req->rq_oa);
61                 if (req->rq_md)
62                         OBD_FREE(req->rq_md, req->rq_buflen);
63                 OBD_FREE(req, sizeof(*req));
64         }
65
66         if (set->set_pga) {
67                 int len = set->set_oabufs * sizeof(*set->set_pga);
68                 OBD_FREE(set->set_pga, len);
69         }
70         if (set->set_lockh)
71                 lov_llh_put(set->set_lockh);
72
73         OBD_FREE(set, sizeof(*set));
74         EXIT;
75 }
76
77 static void lov_update_set(struct lov_request_set *set,
78                            struct lov_request *req, int rc)
79 {
80         req->rq_complete = 1;
81         req->rq_rc = rc;
82
83         set->set_completes++;
84         if (rc == 0)
85                 set->set_success++;
86 }
87
88 int lov_update_common_set(struct lov_request_set *set,
89                           struct lov_request *req, int rc)
90 {
91         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
92         ENTRY;
93
94         lov_update_set(set, req, rc);
95
96         /* grace error on inactive ost */
97         if (rc && !lov->tgts[req->rq_idx].active)
98                 rc = 0;
99
100         /* FIXME in raid1 regime, should return 0 */
101         RETURN(rc);
102 }
103
104 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
105 {
106         list_add_tail(&req->rq_link, &set->set_list);
107         set->set_count++;
108 }
109
110 int lov_update_enqueue_set(struct lov_request_set *set,
111                            struct lov_request *req, int rc, int flags)
112 {
113         struct lustre_handle *lov_lockhp;
114         struct lov_oinfo *loi;
115         ENTRY;
116
117         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
118         loi = &set->set_md->lsm_oinfo[req->rq_stripe];
119
120         /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
121          * It belongs in the OSC, except that the OSC doesn't have
122          * access to the real LOI -- it gets a copy, that we created
123          * above, and that copy can be arbitrarily out of date.
124          *
125          * The LOV API is due for a serious rewriting anyways, and this
126          * can be addressed then. */
127         if (rc == ELDLM_OK) {
128                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
129                 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
130
131                 LASSERT(lock != NULL);
132                 loi->loi_rss = tmp;
133                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
134                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
135                 /* Extend KMS up to the end of this lock and no further
136                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
137                 if (tmp > lock->l_policy_data.l_extent.end)
138                         tmp = lock->l_policy_data.l_extent.end + 1;
139                 if (tmp >= loi->loi_kms) {
140                         CDEBUG(D_INODE, "lock acquired, setting rss="
141                                LPU64", kms="LPU64"\n", loi->loi_rss, tmp);
142                         loi->loi_kms = tmp;
143                         loi->loi_kms_valid = 1;
144                 } else {
145                         CDEBUG(D_INODE, "lock acquired, setting rss="
146                                LPU64"; leaving kms="LPU64", end="LPU64
147                                "\n", loi->loi_rss, loi->loi_kms,
148                                lock->l_policy_data.l_extent.end);
149                 }
150                 ldlm_lock_allow_match(lock);
151                 LDLM_LOCK_PUT(lock);
152         } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
153                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
154                 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
155                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
156                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
157                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
158                        " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
159                 rc = ELDLM_OK;
160         } else {
161                 struct obd_export *exp = set->set_exp;
162                 struct lov_obd *lov = &exp->exp_obd->u.lov;
163
164                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
165                 if (lov->tgts[req->rq_idx].active) {
166                         CERROR("error: enqueue objid "LPX64" subobj "
167                                 LPX64" on OST idx %d: rc = %d\n",
168                                 set->set_md->lsm_object_id, loi->loi_id,
169                                 loi->loi_ost_idx, rc);
170                 } else {
171                         rc = ELDLM_OK;
172                 }
173         }
174         lov_update_set(set, req, rc);
175         RETURN(rc);
176 }
177
178 static int enqueue_done(struct lov_request_set *set, __u32 mode)
179 {
180         struct list_head *pos;
181         struct lov_request *req;
182         struct lustre_handle *lov_lockhp = NULL;
183         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
184         int rc = 0;
185         ENTRY;
186
187         LASSERT(set->set_completes);
188         /* enqueue/match success, just return */
189         if (set->set_completes == set->set_success)
190                 RETURN(0);
191
192         /* cancel enqueued/matched locks */
193         list_for_each (pos, &set->set_list) {
194                 req = list_entry(pos, struct lov_request, rq_link);
195
196                 if (!req->rq_complete || req->rq_rc)
197                         continue;
198
199                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
200                 LASSERT(lov_lockhp);
201                 if (lov_lockhp->cookie == 0)
202                         continue;
203
204                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
205                                 mode, lov_lockhp);
206                 if (rc && lov->tgts[req->rq_idx].active)
207                         CERROR("cancelling obdjid "LPX64" on OST "
208                                "idx %d error: rc = %d\n",
209                                req->rq_md->lsm_object_id, req->rq_idx, rc);
210         }
211         lov_llh_put(set->set_lockh);
212         RETURN(rc);
213 }
214
215 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
216 {
217         int rc = 0;
218         ENTRY;
219
220         LASSERT(set->set_exp);
221         if (set == NULL)
222                 RETURN(0);
223         if (set->set_completes)
224                 rc = enqueue_done(set, mode);
225         else
226                 lov_llh_put(set->set_lockh);
227
228         if (atomic_dec_and_test(&set->set_refcount))
229                 lov_finish_set(set);
230
231         RETURN(rc);
232 }
233
234 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
235                          ldlm_policy_data_t *policy, __u32 mode,
236                          struct lustre_handle *lockh,
237                          struct lov_request_set **reqset)
238 {
239         struct lov_obd *lov = &exp->exp_obd->u.lov;
240         struct lov_request_set *set;
241         int i, rc = 0;
242         struct lov_oinfo *loi;
243         ENTRY;
244
245         OBD_ALLOC(set, sizeof(*set));
246         if (set == NULL)
247                 RETURN(-ENOMEM);
248         lov_init_set(set);
249
250         set->set_exp = exp;
251         set->set_md = lsm;
252         set->set_lockh = lov_llh_new(lsm);
253         if (set->set_lockh == NULL)
254                 GOTO(out_set, rc = -ENOMEM);
255         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
256
257         loi = lsm->lsm_oinfo;
258         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
259                 struct lov_request *req;
260                 obd_off start, end;
261
262                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
263                                            policy->l_extent.end, &start, &end))
264                         continue;
265
266                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
267                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
268                         continue;
269                 }
270
271                 OBD_ALLOC(req, sizeof(*req));
272                 if (req == NULL)
273                         GOTO(out_set, rc = -ENOMEM);
274
275                 req->rq_buflen = sizeof(*req->rq_md) +
276                         sizeof(struct lov_oinfo);
277                 OBD_ALLOC(req->rq_md, req->rq_buflen);
278                 if (req->rq_md == NULL)
279                         GOTO(out_set, rc = -ENOMEM);
280
281                 req->rq_extent.start = start;
282                 req->rq_extent.end = end;
283
284                 req->rq_idx = loi->loi_ost_idx;
285                 req->rq_stripe = i;
286
287                 /* XXX LOV STACKING: submd should be from the subobj */
288                 req->rq_md->lsm_object_id = loi->loi_id;
289                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
290                 req->rq_md->lsm_stripe_count = 0;
291                 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
292                 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
293                 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
294                 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
295                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
296
297                 lov_set_add_req(req, set);
298         }
299         if (!set->set_count)
300                 GOTO(out_set, rc = -EIO);
301         *reqset = set;
302         RETURN(0);
303 out_set:
304         lov_fini_enqueue_set(set, mode);
305         RETURN(rc);
306 }
307
308 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
309                          int rc)
310 {
311         int ret = rc;
312         ENTRY;
313
314         if (rc == 1)
315                 ret = 0;
316         lov_update_set(set, req, ret);
317         RETURN(rc);
318 }
319
320 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
321 {
322         int rc = 0;
323         ENTRY;
324
325         LASSERT(set->set_exp);
326         if (set == NULL)
327                 RETURN(0);
328         if (set->set_completes) {
329                 if (set->set_count == set->set_success &&
330                     flags & LDLM_FL_TEST_LOCK)
331                         lov_llh_put(set->set_lockh);
332                 rc = enqueue_done(set, mode);
333         } else {
334                 lov_llh_put(set->set_lockh);
335         }
336
337         if (atomic_dec_and_test(&set->set_refcount))
338                 lov_finish_set(set);
339
340         RETURN(rc);
341 }
342
343 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
344                        ldlm_policy_data_t *policy, __u32 mode,
345                        struct lustre_handle *lockh,
346                        struct lov_request_set **reqset)
347 {
348         struct lov_obd *lov = &exp->exp_obd->u.lov;
349         struct lov_request_set *set;
350         int i, rc = 0;
351         struct lov_oinfo *loi;
352         ENTRY;
353
354         OBD_ALLOC(set, sizeof(*set));
355         if (set == NULL)
356                 RETURN(-ENOMEM);
357         lov_init_set(set);
358
359         set->set_exp = exp;
360         set->set_md = lsm;
361         set->set_lockh = lov_llh_new(lsm);
362         if (set->set_lockh == NULL)
363                 GOTO(out_set, rc = -ENOMEM);
364         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
365
366         loi = lsm->lsm_oinfo;
367         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
368                 struct lov_request *req;
369                 obd_off start, end;
370
371                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
372                                            policy->l_extent.end, &start, &end))
373                         continue;
374
375                 /* FIXME raid1 should grace this error */
376                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
377                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
378                         GOTO(out_set, rc = -EIO);
379                 }
380
381                 OBD_ALLOC(req, sizeof(*req));
382                 if (req == NULL)
383                         GOTO(out_set, rc = -ENOMEM);
384
385                 req->rq_buflen = sizeof(*req->rq_md);
386                 OBD_ALLOC(req->rq_md, req->rq_buflen);
387                 if (req->rq_md == NULL)
388                         GOTO(out_set, rc = -ENOMEM);
389
390                 req->rq_extent.start = start;
391                 req->rq_extent.end = end;
392
393                 req->rq_idx = loi->loi_ost_idx;
394                 req->rq_stripe = i;
395
396                 /* XXX LOV STACKING: submd should be from the subobj */
397                 req->rq_md->lsm_object_id = loi->loi_id;
398                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
399                 req->rq_md->lsm_stripe_count = 0;
400                 lov_set_add_req(req, set);
401         }
402         if (!set->set_count)
403                 GOTO(out_set, rc = -EIO);
404         *reqset = set;
405         RETURN(rc);
406 out_set:
407         lov_fini_match_set(set, mode, 0);
408         RETURN(rc);
409 }
410
411 int lov_fini_cancel_set(struct lov_request_set *set)
412 {
413         int rc = 0;
414         ENTRY;
415
416         LASSERT(set->set_exp);
417         if (set == NULL)
418                 RETURN(0);
419
420         if (set->set_lockh)
421                 lov_llh_put(set->set_lockh);
422
423         if (atomic_dec_and_test(&set->set_refcount))
424                 lov_finish_set(set);
425
426         RETURN(rc);
427 }
428
429 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
430                         __u32 mode, struct lustre_handle *lockh,
431                         struct lov_request_set **reqset)
432 {
433         struct lov_request_set *set;
434         int i, rc = 0;
435         struct lov_oinfo *loi;
436         ENTRY;
437
438         OBD_ALLOC(set, sizeof(*set));
439         if (set == NULL)
440                 RETURN(-ENOMEM);
441         lov_init_set(set);
442
443         set->set_exp = exp;
444         set->set_md = lsm;
445         set->set_lockh = lov_handle2llh(lockh);
446         if (set->set_lockh == NULL) {
447                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
448                 GOTO(out_set, rc = -EINVAL);
449         }
450         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
451
452         loi = lsm->lsm_oinfo;
453         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
454                 struct lov_request *req;
455                 struct lustre_handle *lov_lockhp;
456
457                 lov_lockhp = set->set_lockh->llh_handles + i;
458                 if (lov_lockhp->cookie == 0) {
459                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
460                                loi->loi_ost_idx, loi->loi_id);
461                         continue;
462                 }
463
464                 OBD_ALLOC(req, sizeof(*req));
465                 if (req == NULL)
466                         GOTO(out_set, rc = -ENOMEM);
467
468                 req->rq_buflen = sizeof(*req->rq_md);
469                 OBD_ALLOC(req->rq_md, req->rq_buflen);
470                 if (req->rq_md == NULL)
471                         GOTO(out_set, rc = -ENOMEM);
472
473                 req->rq_idx = loi->loi_ost_idx;
474                 req->rq_stripe = i;
475
476                 /* XXX LOV STACKING: submd should be from the subobj */
477                 req->rq_md->lsm_object_id = loi->loi_id;
478                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
479                 req->rq_md->lsm_stripe_count = 0;
480                 lov_set_add_req(req, set);
481         }
482         if (!set->set_count)
483                 GOTO(out_set, rc = -EIO);
484         *reqset = set;
485         RETURN(rc);
486 out_set:
487         lov_fini_cancel_set(set);
488         RETURN(rc);
489 }
490
491 static int create_done(struct obd_export *exp, struct lov_request_set *set,
492                        struct lov_stripe_md **ea)
493 {
494         struct lov_obd *lov = &exp->exp_obd->u.lov;
495         struct obd_trans_info *oti = set->set_oti;
496         struct obdo *src_oa = set->set_oa;
497         struct list_head *pos;
498         struct lov_request *req;
499         struct obdo *ret_oa = NULL;
500         int attrset = 0, rc = 0;
501         ENTRY;
502
503         LASSERT(set->set_completes);
504
505         if (!set->set_success)
506                 GOTO(cleanup, rc = -EIO);
507
508         if (*ea == NULL && set->set_count != set->set_success) {
509                 set->set_count = set->set_success;
510                 qos_shrink_lsm(set);
511         }
512
513         ret_oa = obdo_alloc();
514         if (ret_oa == NULL)
515                 GOTO(cleanup, rc = -ENOMEM);
516
517         list_for_each (pos, &set->set_list) {
518                 req = list_entry(pos, struct lov_request, rq_link);
519                 if (!req->rq_complete || req->rq_rc)
520                         continue;
521                 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
522                                 set->set_md, req->rq_stripe, &attrset);
523         }
524         if (src_oa->o_valid & OBD_MD_FLSIZE &&
525             ret_oa->o_size != src_oa->o_size) {
526                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
527                        src_oa->o_size, ret_oa->o_size);
528                 LBUG();
529         }
530         ret_oa->o_id = src_oa->o_id;
531         ret_oa->o_gr = src_oa->o_gr;
532         ret_oa->o_valid |= OBD_MD_FLGROUP;
533         memcpy(src_oa, ret_oa, sizeof(*src_oa));
534         obdo_free(ret_oa);
535
536         *ea = set->set_md;
537         GOTO(done, rc = 0);
538
539         EXIT;
540 cleanup:
541         list_for_each (pos, &set->set_list) {
542                 struct obd_export *sub_exp;
543                 int err = 0;
544                 req = list_entry(pos, struct lov_request, rq_link);
545
546                 if (!req->rq_complete || req->rq_rc)
547                         continue;
548
549                 sub_exp = lov->tgts[req->rq_idx].ltd_exp,
550                 err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
551                 if (err)
552                         CERROR("Failed to uncreate objid "LPX64" subobj "
553                                LPX64" on OST idx %d: rc = %d\n",
554                                set->set_oa->o_id, req->rq_oa->o_id,
555                                req->rq_idx, rc);
556         }
557         if (*ea == NULL)
558                 obd_free_memmd(exp, &set->set_md);
559 done:
560         if (oti && set->set_cookies) {
561                 oti->oti_logcookies = set->set_cookies;
562                 if (!set->set_cookie_sent) {
563                         oti_free_cookies(oti);
564                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
565                 } else {
566                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
567                 }
568         }
569         return rc;
570 }
571
572 int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea)
573 {
574         int rc = 0;
575         ENTRY;
576
577         LASSERT(set->set_exp);
578         if (set == NULL)
579                 RETURN(0);
580         if (set->set_completes) {
581                 rc = create_done(set->set_exp, set, ea);
582                 /* FIXME update qos data here */
583         }
584
585         if (atomic_dec_and_test(&set->set_refcount))
586                 lov_finish_set(set);
587
588         RETURN(rc);
589 }
590
591 int lov_update_create_set(struct lov_request_set *set,
592                           struct lov_request *req, int rc)
593 {
594         struct obd_trans_info *oti = set->set_oti;
595         struct lov_stripe_md *lsm = set->set_md;
596         struct lov_oinfo *loi;
597         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
598         ENTRY;
599
600         req->rq_stripe = set->set_success;
601         loi = &lsm->lsm_oinfo[req->rq_stripe];
602
603         if (rc && lov->tgts[req->rq_idx].active) {
604                 CERROR("error creating objid "LPX64" sub-object"
605                        " on OST idx %d/%d: rc = %d\n",
606                        set->set_oa->o_id, req->rq_idx,
607                        lsm->lsm_stripe_count, rc);
608                 if (rc > 0) {
609                         CERROR("obd_create returned invalid err %d\n", rc);
610                         rc = -EIO;
611                 }
612         }
613         lov_update_set(set, req, rc);
614         if (rc)
615                 RETURN(rc);
616
617         if (oti && oti->oti_objid)
618                 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
619
620         loi->loi_id = req->rq_oa->o_id;
621         loi->loi_gr = req->rq_oa->o_gr;
622         loi->loi_ost_idx = req->rq_idx;
623         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
624                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
625         loi_init(loi);
626
627         if (set->set_cookies)
628                 ++oti->oti_logcookies;
629         if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
630                 set->set_cookie_sent++;
631
632         RETURN(0);
633 }
634
635 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea,
636                         struct obdo *src_oa, struct obd_trans_info *oti,
637                         struct lov_request_set **reqset)
638 {
639         struct lov_obd *lov = &exp->exp_obd->u.lov;
640         struct lov_request_set *set;
641         int rc = 0, newea = 0;
642         ENTRY;
643
644         OBD_ALLOC(set, sizeof(*set));
645         if (set == NULL)
646                 RETURN(-ENOMEM);
647         lov_init_set(set);
648
649         set->set_exp = exp;
650         set->set_md = *ea;
651         set->set_oa = src_oa;
652         set->set_oti = oti;
653
654         if (set->set_md == NULL) {
655                 int stripes, stripe_cnt;
656                 stripe_cnt = lov_get_stripecnt(lov, 0);
657
658                 /* If the MDS file was truncated up to some size, stripe over
659                  * enough OSTs to allow the file to be created at that size. */
660                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
661                         stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
662                         do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
663
664                         if (stripes > lov->desc.ld_active_tgt_count)
665                                 GOTO(out_set, rc = -EFBIG);
666                         if (stripes < stripe_cnt)
667                                 stripes = stripe_cnt;
668                 } else {
669                         stripes = stripe_cnt;
670                 }
671
672                 rc = lov_alloc_memmd(&set->set_md, stripes,
673                                      lov->desc.ld_pattern ?
674                                      lov->desc.ld_pattern : LOV_PATTERN_RAID0);
675                 if (rc < 0)
676                         goto out_set;
677                 newea = 1;
678         }
679
680         rc = qos_prep_create(lov, set, newea);
681         if (rc)
682                 goto out_lsm;
683
684         if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
685                 oti_alloc_cookies(oti, set->set_count);
686                 if (!oti->oti_logcookies)
687                         goto out_lsm;
688                 set->set_cookies = oti->oti_logcookies;
689         }
690         *reqset = set;
691         RETURN(rc);
692
693 out_lsm:
694         if (*ea == NULL)
695                 obd_free_memmd(exp, &set->set_md);
696 out_set:
697         lov_fini_create_set(set, ea);
698         RETURN(rc);
699 }
700
701 static int common_attr_done(struct lov_request_set *set)
702 {
703         struct list_head *pos;
704         struct lov_request *req;
705         struct obdo *tmp_oa;
706         int rc = 0, attrset = 0;
707         ENTRY;
708
709         if (set->set_oa == NULL)
710                 RETURN(0);
711
712         if (!set->set_success)
713                 RETURN(-EIO);
714
715         tmp_oa = obdo_alloc();
716         if (tmp_oa == NULL)
717                 GOTO(out, rc = -ENOMEM);
718
719         list_for_each (pos, &set->set_list) {
720                 req = list_entry(pos, struct lov_request, rq_link);
721
722                 if (!req->rq_complete || req->rq_rc)
723                         continue;
724                 if (req->rq_oa->o_valid == 0)   /* inactive stripe */
725                         continue;
726                 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
727                                 set->set_md, req->rq_stripe, &attrset);
728         }
729         if (!attrset) {
730                 CERROR("No stripes had valid attrs\n");
731                 rc = -EIO;
732         }
733         tmp_oa->o_id = set->set_oa->o_id;
734         memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
735 out:
736         if (tmp_oa)
737                 obdo_free(tmp_oa);
738         RETURN(rc);
739
740 }
741
742 static int brw_done(struct lov_request_set *set)
743 {
744         struct lov_stripe_md *lsm = set->set_md;
745         struct lov_oinfo     *loi = NULL;
746         struct list_head *pos;
747         struct lov_request *req;
748         ENTRY;
749                                                                                                                              
750         list_for_each (pos, &set->set_list) {
751                 req = list_entry(pos, struct lov_request, rq_link);
752                                                                                                                              
753                 if (!req->rq_complete || req->rq_rc)
754                         continue;
755                                                                                                                              
756                 loi = &lsm->lsm_oinfo[req->rq_stripe];
757                                                                                                                              
758                 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
759                         loi->loi_blocks = req->rq_oa->o_blocks;
760         }
761                                                                                                                              
762         RETURN(0);
763 }
764
765 int lov_fini_brw_set(struct lov_request_set *set)
766 {
767         int rc = 0;
768         ENTRY;
769
770         LASSERT(set->set_exp);
771         if (set == NULL)
772                 RETURN(0);
773         if (set->set_completes) {
774                 rc = brw_done(set);
775                 /* FIXME update qos data here */
776         }
777         if (atomic_dec_and_test(&set->set_refcount))
778                 lov_finish_set(set);
779
780         RETURN(rc);
781 }
782
783 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
784                      struct lov_stripe_md *lsm, obd_count oa_bufs,
785                      struct brw_page *pga, struct obd_trans_info *oti,
786                      struct lov_request_set **reqset)
787 {
788         struct {
789                 obd_count       index;
790                 obd_count       count;
791                 obd_count       off;
792         } *info = NULL;
793         struct lov_request_set *set;
794         struct lov_oinfo *loi = NULL;
795         struct lov_obd *lov = &exp->exp_obd->u.lov;
796         int rc = 0, i, shift;
797         ENTRY;
798
799         OBD_ALLOC(set, sizeof(*set));
800         if (set == NULL)
801                 RETURN(-ENOMEM);
802         lov_init_set(set);
803
804         set->set_exp = exp;
805         set->set_md = lsm;
806         set->set_oa = src_oa;
807         set->set_oti = oti;
808         set->set_oabufs = oa_bufs;
809         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
810         if (!set->set_pga)
811                 GOTO(out, rc = -ENOMEM);
812
813         OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
814         if (!info)
815                 GOTO(out, rc = -ENOMEM);
816
817         /* calculate the page count for each stripe */
818         for (i = 0; i < oa_bufs; i++) {
819                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
820                 info[stripe].count++;
821         }
822
823         /* alloc and initialize lov request */
824         loi = lsm->lsm_oinfo;
825         shift = 0;
826         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
827                 struct lov_request *req;
828
829                 if (info[i].count == 0)
830                         continue;
831
832                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
833                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
834                         GOTO(out, rc = -EIO);
835                 }
836
837                 OBD_ALLOC(req, sizeof(*req));
838                 if (req == NULL)
839                         GOTO(out, rc = -ENOMEM);
840
841                 req->rq_oa = obdo_alloc();
842                 if (req->rq_oa == NULL)
843                         GOTO(out, rc = -ENOMEM);
844
845                 if (src_oa)
846                         memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
847                 req->rq_oa->o_id = loi->loi_id;
848                 req->rq_buflen = sizeof(*req->rq_md);
849                 OBD_ALLOC(req->rq_md, req->rq_buflen);
850                 if (req->rq_md == NULL)
851                         GOTO(out, rc = -ENOMEM);
852
853                 req->rq_idx = loi->loi_ost_idx;
854                 req->rq_stripe = i;
855
856                 /* XXX LOV STACKING */
857                 req->rq_md->lsm_object_id = loi->loi_id;
858                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
859                 req->rq_oabufs = info[i].count;
860                 req->rq_pgaidx = shift;
861                 shift += req->rq_oabufs;
862
863                 /* remember the index for sort brw_page array */
864                 info[i].index = req->rq_pgaidx;
865                 lov_set_add_req(req, set);
866         }
867         if (!set->set_count)
868                 GOTO(out, rc = -EIO);
869
870         /* rotate & sort the brw_page array */
871         for (i = 0; i < oa_bufs; i++) {
872                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
873
874                 shift = info[stripe].index + info[stripe].off;
875                 LASSERT(shift < oa_bufs);
876                 set->set_pga[shift] = pga[i];
877                 lov_stripe_offset(lsm, pga[i].disk_offset, stripe,
878                                   &set->set_pga[shift].disk_offset);
879                 info[stripe].off++;
880         }
881 out:
882         if (info)
883                 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
884
885         if (rc == 0)
886                 *reqset = set;
887         else
888                 lov_fini_brw_set(set);
889
890         RETURN(rc);
891 }
892
893 static int getattr_done(struct lov_request_set *set)
894 {
895         return common_attr_done(set);
896 }
897
898 int lov_fini_getattr_set(struct lov_request_set *set)
899 {
900         int rc = 0;
901         ENTRY;
902
903         LASSERT(set->set_exp);
904         if (set == NULL)
905                 RETURN(0);
906         if (set->set_completes)
907                 rc = getattr_done(set);
908
909         if (atomic_dec_and_test(&set->set_refcount))
910                 lov_finish_set(set);
911
912         RETURN(rc);
913 }
914
915 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
916                          struct lov_stripe_md *lsm,
917                          struct lov_request_set **reqset)
918 {
919         struct lov_request_set *set;
920         struct lov_oinfo *loi = NULL;
921         struct lov_obd *lov = &exp->exp_obd->u.lov;
922         int rc = 0, i;
923         ENTRY;
924
925         OBD_ALLOC(set, sizeof(*set));
926         if (set == NULL)
927                 RETURN(-ENOMEM);
928         lov_init_set(set);
929
930         set->set_exp = exp;
931         set->set_md = lsm;
932         set->set_oa = src_oa;
933
934         loi = lsm->lsm_oinfo;
935         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
936                 struct lov_request *req;
937
938                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
939                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
940                         continue;
941                 }
942
943                 OBD_ALLOC(req, sizeof(*req));
944                 if (req == NULL)
945                         GOTO(out_set, rc = -ENOMEM);
946
947                 req->rq_stripe = i;
948                 req->rq_idx = loi->loi_ost_idx;
949
950                 req->rq_oa = obdo_alloc();
951                 if (req->rq_oa == NULL)
952                         GOTO(out_set, rc = -ENOMEM);
953                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
954                 req->rq_oa->o_id = loi->loi_id;
955                 lov_set_add_req(req, set);
956         }
957         if (!set->set_count)
958                 GOTO(out_set, rc = -EIO);
959         *reqset = set;
960         RETURN(rc);
961 out_set:
962         lov_fini_getattr_set(set);
963         RETURN(rc);
964 }
965
966 int lov_fini_destroy_set(struct lov_request_set *set)
967 {
968         ENTRY;
969
970         LASSERT(set->set_exp);
971         if (set == NULL)
972                 RETURN(0);
973         if (set->set_completes) {
974                 /* FIXME update qos data here */
975         }
976
977         if (atomic_dec_and_test(&set->set_refcount))
978                 lov_finish_set(set);
979
980         RETURN(0);
981 }
982
983 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
984                          struct lov_stripe_md *lsm,
985                          struct obd_trans_info *oti,
986                          struct lov_request_set **reqset)
987 {
988         struct lov_request_set *set;
989         struct lov_oinfo *loi = NULL;
990         struct lov_obd *lov = &exp->exp_obd->u.lov;
991         int rc = 0, cookie_set = 0, i;
992         ENTRY;
993
994         OBD_ALLOC(set, sizeof(*set));
995         if (set == NULL)
996                 RETURN(-ENOMEM);
997         lov_init_set(set);
998
999         set->set_exp = exp;
1000         set->set_md = lsm;
1001         set->set_oa = src_oa;
1002         set->set_oti = oti;
1003         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1004                 set->set_cookies = oti->oti_logcookies;
1005
1006         loi = lsm->lsm_oinfo;
1007         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1008                 struct lov_request *req;
1009
1010                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1011                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1012                         continue;
1013                 }
1014
1015                 OBD_ALLOC(req, sizeof(*req));
1016                 if (req == NULL)
1017                         GOTO(out_set, rc = -ENOMEM);
1018
1019                 req->rq_stripe = i;
1020                 req->rq_idx = loi->loi_ost_idx;
1021
1022                 req->rq_oa = obdo_alloc();
1023                 if (req->rq_oa == NULL)
1024                         GOTO(out_set, rc = -ENOMEM);
1025                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1026                 req->rq_oa->o_id = loi->loi_id;
1027
1028                 /* Setup the first request's cookie position */
1029                 if (!cookie_set && set->set_cookies) {
1030                         oti->oti_logcookies = set->set_cookies + i;
1031                         cookie_set = 1;
1032                 }
1033                 lov_set_add_req(req, set);
1034         }
1035         if (!set->set_count)
1036                 GOTO(out_set, rc = -EIO);
1037         *reqset = set;
1038         RETURN(rc);
1039 out_set:
1040         lov_fini_destroy_set(set);
1041         RETURN(rc);
1042 }
1043
1044 static int setattr_done(struct lov_request_set *set)
1045 {
1046         return common_attr_done(set);
1047 }
1048
1049 int lov_fini_setattr_set(struct lov_request_set *set)
1050 {
1051         int rc = 0;
1052         ENTRY;
1053
1054         LASSERT(set->set_exp);
1055         if (set == NULL)
1056                 RETURN(0);
1057         if (set->set_completes) {
1058                 rc = setattr_done(set);
1059                 /* FIXME update qos data here */
1060         }
1061
1062         if (atomic_dec_and_test(&set->set_refcount))
1063                 lov_finish_set(set);
1064         RETURN(rc);
1065 }
1066
1067 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1068                          struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1069                          struct lov_request_set **reqset)
1070 {
1071         struct lov_request_set *set;
1072         struct lov_oinfo *loi = NULL;
1073         struct lov_obd *lov = &exp->exp_obd->u.lov;
1074         int rc = 0, i;
1075         ENTRY;
1076
1077         OBD_ALLOC(set, sizeof(*set));
1078         if (set == NULL)
1079                 RETURN(-ENOMEM);
1080         lov_init_set(set);
1081
1082         set->set_exp = exp;
1083         set->set_md = lsm;
1084         set->set_oa = src_oa;
1085
1086         loi = lsm->lsm_oinfo;
1087         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1088                 struct lov_request *req;
1089
1090                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1091                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1092                         continue;
1093                 }
1094
1095                 OBD_ALLOC(req, sizeof(*req));
1096                 if (req == NULL)
1097                         GOTO(out_set, rc = -ENOMEM);
1098                 req->rq_stripe = i;
1099                 req->rq_idx = loi->loi_ost_idx;
1100
1101                 req->rq_oa = obdo_alloc();
1102                 if (req->rq_oa == NULL)
1103                         GOTO(out_set, rc = -ENOMEM);
1104                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1105                 req->rq_oa->o_id = loi->loi_id;
1106                 LASSERT(!(req->rq_oa->o_valid & OBD_MD_FLGROUP) || req->rq_oa->o_gr>0);
1107
1108                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1109                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
1110                                               &req->rq_oa->o_size) < 0 &&
1111                             req->rq_oa->o_size)
1112                                 req->rq_oa->o_size--;
1113                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1114                                i, req->rq_oa->o_size, src_oa->o_size);
1115                 }
1116                 lov_set_add_req(req, set);
1117         }
1118         if (!set->set_count)
1119                 GOTO(out_set, rc = -EIO);
1120         *reqset = set;
1121         RETURN(rc);
1122 out_set:
1123         lov_fini_setattr_set(set);
1124         RETURN(rc);
1125 }
1126
1127 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1128                          int rc)
1129 {
1130         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1131         ENTRY;
1132
1133         lov_update_set(set, req, rc);
1134         if (rc && !lov->tgts[req->rq_idx].active)
1135                 rc = 0;
1136         /* FIXME in raid1 regime, should return 0 */
1137         RETURN(rc);
1138 }
1139
1140 int lov_fini_punch_set(struct lov_request_set *set)
1141 {
1142         int rc = 0;
1143         ENTRY;
1144
1145         LASSERT(set->set_exp);
1146         if (set == NULL)
1147                 RETURN(0);
1148         if (set->set_completes) {
1149                 if (!set->set_success)
1150                         rc = -EIO;
1151                 /* FIXME update qos data here */
1152         }
1153
1154         if (atomic_dec_and_test(&set->set_refcount))
1155                 lov_finish_set(set);
1156
1157         RETURN(rc);
1158 }
1159
1160 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1161                        struct lov_stripe_md *lsm, obd_off start,
1162                        obd_off end, struct obd_trans_info *oti,
1163                        struct lov_request_set **reqset)
1164 {
1165         struct lov_request_set *set;
1166         struct lov_oinfo *loi = NULL;
1167         struct lov_obd *lov = &exp->exp_obd->u.lov;
1168         int rc = 0, i;
1169         ENTRY;
1170
1171         OBD_ALLOC(set, sizeof(*set));
1172         if (set == NULL)
1173                 RETURN(-ENOMEM);
1174         lov_init_set(set);
1175
1176         set->set_exp = exp;
1177         set->set_md = lsm;
1178         set->set_oa = src_oa;
1179
1180         loi = lsm->lsm_oinfo;
1181         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1182                 struct lov_request *req;
1183                 obd_off rs, re;
1184
1185                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1186                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1187                         continue;
1188                 }
1189
1190                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1191                         continue;
1192
1193                 OBD_ALLOC(req, sizeof(*req));
1194                 if (req == NULL)
1195                         GOTO(out_set, rc = -ENOMEM);
1196                 req->rq_stripe = i;
1197                 req->rq_idx = loi->loi_ost_idx;
1198
1199                 req->rq_oa = obdo_alloc();
1200                 if (req->rq_oa == NULL)
1201                         GOTO(out_set, rc = -ENOMEM);
1202                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1203                 req->rq_oa->o_id = loi->loi_id;
1204                 req->rq_oa->o_gr = loi->loi_gr;
1205                 req->rq_oa->o_valid |= OBD_MD_FLGROUP;
1206
1207                 req->rq_extent.start = rs;
1208                 req->rq_extent.end = re;
1209
1210                 lov_set_add_req(req, set);
1211         }
1212         if (!set->set_count)
1213                 GOTO(out_set, rc = -EIO);
1214         *reqset = set;
1215         RETURN(rc);
1216 out_set:
1217         lov_fini_punch_set(set);
1218         RETURN(rc);
1219 }
1220
1221 int lov_fini_sync_set(struct lov_request_set *set)
1222 {
1223         int rc = 0;
1224         ENTRY;
1225
1226         LASSERT(set->set_exp);
1227         if (set == NULL)
1228                 RETURN(0);
1229         if (set->set_completes) {
1230                 if (!set->set_success)
1231                         rc = -EIO;
1232                 /* FIXME update qos data here */
1233         }
1234
1235         if (atomic_dec_and_test(&set->set_refcount))
1236                 lov_finish_set(set);
1237
1238         RETURN(rc);
1239 }
1240
1241 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1242                       struct lov_stripe_md *lsm, obd_off start,
1243                       obd_off end, struct lov_request_set **reqset)
1244 {
1245         struct lov_request_set *set;
1246         struct lov_oinfo *loi = NULL;
1247         struct lov_obd *lov = &exp->exp_obd->u.lov;
1248         int rc = 0, i;
1249         ENTRY;
1250
1251         OBD_ALLOC(set, sizeof(*set));
1252         if (set == NULL)
1253                 RETURN(-ENOMEM);
1254         lov_init_set(set);
1255
1256         set->set_exp = exp;
1257         set->set_md = lsm;
1258         set->set_oa = src_oa;
1259
1260         loi = lsm->lsm_oinfo;
1261         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1262                 struct lov_request *req;
1263                 obd_off rs, re;
1264
1265                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1266                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1267                         continue;
1268                 }
1269
1270                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1271                         continue;
1272
1273                 OBD_ALLOC(req, sizeof(*req));
1274                 if (req == NULL)
1275                         GOTO(out_set, rc = -ENOMEM);
1276                 req->rq_stripe = i;
1277                 req->rq_idx = loi->loi_ost_idx;
1278
1279                 req->rq_oa = obdo_alloc();
1280                 if (req->rq_oa == NULL)
1281                         GOTO(out_set, rc = -ENOMEM);
1282                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1283                 req->rq_oa->o_id = loi->loi_id;
1284                 req->rq_extent.start = rs;
1285                 req->rq_extent.end = re;
1286                 lov_set_add_req(req, set);
1287         }
1288         if (!set->set_count)
1289                 GOTO(out_set, rc = -EIO);
1290         *reqset = set;
1291         RETURN(rc);
1292 out_set:
1293         lov_fini_sync_set(set);
1294         RETURN(rc);
1295 }