Whamcloud - gitweb
Land b1_8_gate onto b1_8 (20081218_1708)
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LOV
41
42 #ifdef __KERNEL__
43 #include <libcfs/libcfs.h>
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre/lustre_idl.h>
51
52 #include "lov_internal.h"
53
54 static void lov_init_set(struct lov_request_set *set)
55 {
56         set->set_count = 0;
57         set->set_completes = 0;
58         set->set_success = 0;
59         set->set_cookies = 0;
60         CFS_INIT_LIST_HEAD(&set->set_list);
61         atomic_set(&set->set_refcount, 1);
62 }
63
64 static void lov_finish_set(struct lov_request_set *set)
65 {
66         struct list_head *pos, *n;
67         ENTRY;
68
69         LASSERT(set);
70         list_for_each_safe(pos, n, &set->set_list) {
71                 struct lov_request *req = list_entry(pos, struct lov_request,
72                                                      rq_link);
73                 list_del_init(&req->rq_link);
74
75                 if (req->rq_oi.oi_oa)
76                         OBDO_FREE(req->rq_oi.oi_oa);
77                 if (req->rq_oi.oi_md)
78                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
79                 if (req->rq_oi.oi_osfs)
80                         OBD_FREE(req->rq_oi.oi_osfs,
81                                  sizeof(*req->rq_oi.oi_osfs));
82                 OBD_FREE(req, sizeof(*req));
83         }
84
85         if (set->set_pga) {
86                 int len = set->set_oabufs * sizeof(*set->set_pga);
87                 OBD_FREE(set->set_pga, len);
88         }
89         if (set->set_lockh)
90                 lov_llh_put(set->set_lockh);
91
92         OBD_FREE(set, sizeof(*set));
93         EXIT;
94 }
95
96 void lov_update_set(struct lov_request_set *set,
97                     struct lov_request *req, int rc)
98 {
99         req->rq_complete = 1;
100         req->rq_rc = rc;
101
102         set->set_completes++;
103         if (rc == 0)
104                 set->set_success++;
105 }
106
107 int lov_update_common_set(struct lov_request_set *set,
108                           struct lov_request *req, int rc)
109 {
110         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
111         ENTRY;
112
113         lov_update_set(set, req, rc);
114
115         /* grace error on inactive ost */
116         if (rc && !(lov->lov_tgts[req->rq_idx] && 
117                     lov->lov_tgts[req->rq_idx]->ltd_active))
118                 rc = 0;
119
120         /* FIXME in raid1 regime, should return 0 */
121         RETURN(rc);
122 }
123
124 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
125 {
126         list_add_tail(&req->rq_link, &set->set_list);
127         set->set_count++;
128 }
129
130 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
131 {
132         struct lov_request_set *set = req->rq_rqset;
133         struct lustre_handle *lov_lockhp;
134         struct lov_oinfo *loi;
135         ENTRY;
136
137         LASSERT(set != NULL);
138         LASSERT(set->set_oi != NULL);
139
140         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
141         loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
142
143         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
144          * and that copy can be arbitrarily out of date.
145          *
146          * The LOV API is due for a serious rewriting anyways, and this
147          * can be addressed then. */
148
149         if (rc == ELDLM_OK) {
150                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
151                 __u64 tmp;
152
153                 LASSERT(lock != NULL);
154                 lov_stripe_lock(set->set_oi->oi_md);
155                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
156                 tmp = loi->loi_lvb.lvb_size;
157                 /* Extend KMS up to the end of this lock and no further
158                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
159                 if (tmp > lock->l_policy_data.l_extent.end)
160                         tmp = lock->l_policy_data.l_extent.end + 1;
161                 if (tmp >= loi->loi_kms) {
162                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
163                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
164                         loi->loi_kms = tmp;
165                         loi->loi_kms_valid = 1;
166                 } else {
167                         LDLM_DEBUG(lock, "lock acquired, setting rss="
168                                    LPU64"; leaving kms="LPU64", end="LPU64,
169                                    loi->loi_lvb.lvb_size, loi->loi_kms,
170                                    lock->l_policy_data.l_extent.end);
171                 }
172                 lov_stripe_unlock(set->set_oi->oi_md);
173                 ldlm_lock_allow_match(lock);
174                 LDLM_LOCK_PUT(lock);
175         } else if ((rc == ELDLM_LOCK_ABORTED) &&
176                    (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
177                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
178                 lov_stripe_lock(set->set_oi->oi_md);
179                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
180                 lov_stripe_unlock(set->set_oi->oi_md);
181                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
182                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
183                 rc = ELDLM_OK;
184         } else {
185                 struct obd_export *exp = set->set_exp;
186                 struct lov_obd *lov = &exp->exp_obd->u.lov;
187
188                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
189                 if (lov->lov_tgts[req->rq_idx] &&
190                     lov->lov_tgts[req->rq_idx]->ltd_active) {
191                         /* -EUSERS used by OST to report file contention */
192                         if (rc != -EINTR && rc != -EUSERS)
193                                 CERROR("enqueue objid "LPX64" subobj "
194                                        LPX64" on OST idx %d: rc %d\n",
195                                        set->set_oi->oi_md->lsm_object_id,
196                                        loi->loi_id, loi->loi_ost_idx, rc);
197                 } else {
198                         rc = ELDLM_OK;
199                 }
200         }
201         lov_update_set(set, req, rc);
202         RETURN(rc);
203 }
204
205 /* The callback for osc_enqueue that updates lov info for every OSC request. */
206 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
207 {
208         struct ldlm_enqueue_info *einfo;
209         struct lov_request *lovreq;
210
211         lovreq = container_of(oinfo, struct lov_request, rq_oi);
212         einfo = lovreq->rq_rqset->set_ei;
213         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
214 }
215
216 static int enqueue_done(struct lov_request_set *set, __u32 mode)
217 {
218         struct lov_request *req;
219         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
220         int rc = 0;
221         ENTRY;
222
223         /* enqueue/match success, just return */
224         if (set->set_completes && set->set_completes == set->set_success)
225                 RETURN(0);
226
227         /* cancel enqueued/matched locks */
228         list_for_each_entry(req, &set->set_list, rq_link) {
229                 struct lustre_handle *lov_lockhp;
230
231                 if (!req->rq_complete || req->rq_rc)
232                         continue;
233
234                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
235                 LASSERT(lov_lockhp);
236                 if (!lustre_handle_is_used(lov_lockhp))
237                         continue;
238
239                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
240                                 req->rq_oi.oi_md, mode, lov_lockhp);
241                 if (rc && lov->lov_tgts[req->rq_idx] &&
242                     lov->lov_tgts[req->rq_idx]->ltd_active)
243                         CERROR("cancelling obdjid "LPX64" on OST "
244                                "idx %d error: rc = %d\n",
245                                req->rq_oi.oi_md->lsm_object_id,
246                                req->rq_idx, rc);
247         }
248         if (set->set_lockh)
249                 lov_llh_put(set->set_lockh);
250         RETURN(rc);
251 }
252
253 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
254                          struct ptlrpc_request_set *rqset)
255 {
256         int ret = 0;
257         ENTRY;
258
259         if (set == NULL)
260                 RETURN(0);
261         LASSERT(set->set_exp);
262         /* Do enqueue_done only for sync requests and if any request
263          * succeeded. */
264         if (!rqset) {
265                 if (rc)
266                         set->set_completes = 0;
267                 ret = enqueue_done(set, mode);
268         } else if (set->set_lockh)
269                 lov_llh_put(set->set_lockh);
270
271         if (atomic_dec_and_test(&set->set_refcount))
272                 lov_finish_set(set);
273
274         RETURN(rc ? rc : ret);
275 }
276
277 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
278                          struct ldlm_enqueue_info *einfo,
279                          struct lov_request_set **reqset)
280 {
281         struct lov_obd *lov = &exp->exp_obd->u.lov;
282         struct lov_request_set *set;
283         int i, rc = 0;
284         struct lov_oinfo *loi;
285         ENTRY;
286
287         OBD_ALLOC(set, sizeof(*set));
288         if (set == NULL)
289                 RETURN(-ENOMEM);
290         lov_init_set(set);
291
292         set->set_exp = exp;
293         set->set_oi = oinfo;
294         set->set_ei = einfo;
295         set->set_lockh = lov_llh_new(oinfo->oi_md);
296         if (set->set_lockh == NULL)
297                 GOTO(out_set, rc = -ENOMEM);
298         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
299
300         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
301                 struct lov_request *req;
302                 obd_off start, end;
303
304                 loi = oinfo->oi_md->lsm_oinfo[i];
305                 if (!lov_stripe_intersects(oinfo->oi_md, i,
306                                            oinfo->oi_policy.l_extent.start,
307                                            oinfo->oi_policy.l_extent.end,
308                                            &start, &end))
309                         continue;
310
311                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
312                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
313                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
314                         continue;
315                 }
316
317                 OBD_ALLOC(req, sizeof(*req));
318                 if (req == NULL)
319                         GOTO(out_set, rc = -ENOMEM);
320
321                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
322                         sizeof(struct lov_oinfo *) +
323                         sizeof(struct lov_oinfo);
324                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
325                 if (req->rq_oi.oi_md == NULL) {
326                         OBD_FREE(req, sizeof(*req));
327                         GOTO(out_set, rc = -ENOMEM);
328                 }
329                 req->rq_oi.oi_md->lsm_oinfo[0] =
330                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
331                         sizeof(struct lov_oinfo *);
332
333
334                 req->rq_rqset = set;
335                 /* Set lov request specific parameters. */
336                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
337                 req->rq_oi.oi_cb_up = cb_update_enqueue;
338                 req->rq_oi.oi_flags = oinfo->oi_flags;
339
340                 LASSERT(req->rq_oi.oi_lockh);
341
342                 req->rq_oi.oi_policy.l_extent.gid =
343                         oinfo->oi_policy.l_extent.gid;
344                 req->rq_oi.oi_policy.l_extent.start = start;
345                 req->rq_oi.oi_policy.l_extent.end = end;
346
347                 req->rq_idx = loi->loi_ost_idx;
348                 req->rq_stripe = i;
349
350                 /* XXX LOV STACKING: submd should be from the subobj */
351                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
352                 req->rq_oi.oi_md->lsm_object_gr = loi->loi_gr;
353                 req->rq_oi.oi_md->lsm_stripe_count = 0;
354                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
355                         loi->loi_kms_valid;
356                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
357                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
358
359                 lov_set_add_req(req, set);
360         }
361         if (!set->set_count)
362                 GOTO(out_set, rc = -EIO);
363         *reqset = set;
364         RETURN(0);
365 out_set:
366         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
367         RETURN(rc);
368 }
369
370 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
371                          int rc)
372 {
373         int ret = rc;
374         ENTRY;
375
376         if (rc > 0)
377                 ret = 0;
378         else if (rc == 0)
379                 ret = 1;
380         lov_update_set(set, req, ret);
381         RETURN(rc);
382 }
383
384 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
385 {
386         int rc = 0;
387         ENTRY;
388
389         if (set == NULL)
390                 RETURN(0);
391         LASSERT(set->set_exp);
392         rc = enqueue_done(set, mode);
393         if ((set->set_count == set->set_success) &&
394             (flags & LDLM_FL_TEST_LOCK))
395                 lov_llh_put(set->set_lockh);
396
397         if (atomic_dec_and_test(&set->set_refcount))
398                 lov_finish_set(set);
399
400         RETURN(rc);
401 }
402
403 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
404                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
405                        __u32 mode, struct lustre_handle *lockh,
406                        struct lov_request_set **reqset)
407 {
408         struct lov_obd *lov = &exp->exp_obd->u.lov;
409         struct lov_request_set *set;
410         int i, rc = 0;
411         struct lov_oinfo *loi;
412         ENTRY;
413
414         OBD_ALLOC(set, sizeof(*set));
415         if (set == NULL)
416                 RETURN(-ENOMEM);
417         lov_init_set(set);
418
419         set->set_exp = exp;
420         set->set_oi = oinfo;
421         set->set_oi->oi_md = lsm;
422         set->set_lockh = lov_llh_new(lsm);
423         if (set->set_lockh == NULL)
424                 GOTO(out_set, rc = -ENOMEM);
425         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
426
427         for (i = 0; i < lsm->lsm_stripe_count; i++){
428                 struct lov_request *req;
429                 obd_off start, end;
430
431                 loi = lsm->lsm_oinfo[i];
432                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
433                                            policy->l_extent.end, &start, &end))
434                         continue;
435
436                 /* FIXME raid1 should grace this error */
437                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
438                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
439                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
440                         GOTO(out_set, rc = -EIO);
441                 }
442
443                 OBD_ALLOC(req, sizeof(*req));
444                 if (req == NULL)
445                         GOTO(out_set, rc = -ENOMEM);
446
447                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
448                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
449                 if (req->rq_oi.oi_md == NULL) {
450                         OBD_FREE(req, sizeof(*req));
451                         GOTO(out_set, rc = -ENOMEM);
452                 }
453
454                 req->rq_oi.oi_policy.l_extent.start = start;
455                 req->rq_oi.oi_policy.l_extent.end = end;
456                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
457
458                 req->rq_idx = loi->loi_ost_idx;
459                 req->rq_stripe = i;
460
461                 /* XXX LOV STACKING: submd should be from the subobj */
462                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
463                 req->rq_oi.oi_md->lsm_object_gr = loi->loi_gr;
464                 req->rq_oi.oi_md->lsm_stripe_count = 0;
465
466                 lov_set_add_req(req, set);
467         }
468         if (!set->set_count)
469                 GOTO(out_set, rc = -EIO);
470         *reqset = set;
471         RETURN(rc);
472 out_set:
473         lov_fini_match_set(set, mode, 0);
474         RETURN(rc);
475 }
476
477 int lov_fini_cancel_set(struct lov_request_set *set)
478 {
479         int rc = 0;
480         ENTRY;
481
482         if (set == NULL)
483                 RETURN(0);
484
485         LASSERT(set->set_exp);
486         if (set->set_lockh)
487                 lov_llh_put(set->set_lockh);
488
489         if (atomic_dec_and_test(&set->set_refcount))
490                 lov_finish_set(set);
491
492         RETURN(rc);
493 }
494
495 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
496                         struct lov_stripe_md *lsm, __u32 mode,
497                         struct lustre_handle *lockh,
498                         struct lov_request_set **reqset)
499 {
500         struct lov_request_set *set;
501         int i, rc = 0;
502         struct lov_oinfo *loi;
503         ENTRY;
504
505         OBD_ALLOC(set, sizeof(*set));
506         if (set == NULL)
507                 RETURN(-ENOMEM);
508         lov_init_set(set);
509
510         set->set_exp = exp;
511         set->set_oi = oinfo;
512         set->set_oi->oi_md = lsm;
513         set->set_lockh = lov_handle2llh(lockh);
514         if (set->set_lockh == NULL) {
515                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
516                 GOTO(out_set, rc = -EINVAL);
517         }
518         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
519
520         for (i = 0; i < lsm->lsm_stripe_count; i++){
521                 struct lov_request *req;
522                 struct lustre_handle *lov_lockhp;
523
524                 loi = lsm->lsm_oinfo[i];
525                 lov_lockhp = set->set_lockh->llh_handles + i;
526                 if (!lustre_handle_is_used(lov_lockhp)) {
527                         CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
528                                loi->loi_ost_idx, loi->loi_id);
529                         continue;
530                 }
531
532                 OBD_ALLOC(req, sizeof(*req));
533                 if (req == NULL)
534                         GOTO(out_set, rc = -ENOMEM);
535
536                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
537                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
538                 if (req->rq_oi.oi_md == NULL) {
539                         OBD_FREE(req, sizeof(*req));
540                         GOTO(out_set, rc = -ENOMEM);
541                 }
542
543                 req->rq_idx = loi->loi_ost_idx;
544                 req->rq_stripe = i;
545
546                 /* XXX LOV STACKING: submd should be from the subobj */
547                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
548                 req->rq_oi.oi_md->lsm_stripe_count = 0;
549
550                 lov_set_add_req(req, set);
551         }
552         if (!set->set_count)
553                 GOTO(out_set, rc = -EIO);
554         *reqset = set;
555         RETURN(rc);
556 out_set:
557         lov_fini_cancel_set(set);
558         RETURN(rc);
559 }
560
561 static int create_done(struct obd_export *exp, struct lov_request_set *set,
562                        struct lov_stripe_md **lsmp)
563 {
564         struct lov_obd *lov = &exp->exp_obd->u.lov;
565         struct obd_trans_info *oti = set->set_oti;
566         struct obdo *src_oa = set->set_oi->oi_oa;
567         struct lov_request *req;
568         struct obdo *ret_oa = NULL;
569         int attrset = 0, rc = 0;
570         ENTRY;
571
572         LASSERT(set->set_completes);
573
574         /* try alloc objects on other osts if osc_create fails for
575          * exceptions: RPC failure, ENOSPC, etc */
576         if (set->set_count != set->set_success) {
577                 list_for_each_entry (req, &set->set_list, rq_link) {
578                         if (req->rq_rc == 0)
579                                 continue;
580
581                         set->set_completes--;
582                         req->rq_complete = 0;
583
584                         rc = qos_remedy_create(set, req);
585                         lov_update_create_set(set, req, rc);
586
587                         if (rc)
588                                 break;
589                 }
590         }
591
592         /* no successful creates */
593         if (set->set_success == 0)
594                 GOTO(cleanup, rc);
595
596         /* If there was an explicit stripe set, fail.  Otherwise, we
597          * got some objects and that's not bad. */
598         if (set->set_count != set->set_success) {
599                 if (*lsmp)
600                         GOTO(cleanup, rc);
601                 set->set_count = set->set_success;
602                 qos_shrink_lsm(set);
603         }
604
605         OBDO_ALLOC(ret_oa);
606         if (ret_oa == NULL)
607                 GOTO(cleanup, rc = -ENOMEM);
608
609         list_for_each_entry(req, &set->set_list, rq_link) {
610                 if (!req->rq_complete || req->rq_rc)
611                         continue;
612                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
613                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
614                                 req->rq_stripe, &attrset);
615         }
616         if (src_oa->o_valid & OBD_MD_FLSIZE &&
617             ret_oa->o_size != src_oa->o_size) {
618                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
619                        src_oa->o_size, ret_oa->o_size);
620                 LBUG();
621         }
622         ret_oa->o_id = src_oa->o_id;
623         memcpy(src_oa, ret_oa, sizeof(*src_oa));
624         OBDO_FREE(ret_oa);
625
626         *lsmp = set->set_oi->oi_md;
627         GOTO(done, rc = 0);
628
629 cleanup:
630         list_for_each_entry(req, &set->set_list, rq_link) {
631                 struct obd_export *sub_exp;
632                 int err = 0;
633
634                 if (!req->rq_complete || req->rq_rc)
635                         continue;
636
637                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
638                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
639                 if (err)
640                         CERROR("Failed to uncreate objid "LPX64" subobj "
641                                LPX64" on OST idx %d: rc = %d\n",
642                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
643                                req->rq_idx, rc);
644         }
645         if (*lsmp == NULL)
646                 obd_free_memmd(exp, &set->set_oi->oi_md);
647 done:
648         if (oti && set->set_cookies) {
649                 oti->oti_logcookies = set->set_cookies;
650                 if (!set->set_cookie_sent) {
651                         oti_free_cookies(oti);
652                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
653                 } else {
654                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
655                 }
656         }
657         RETURN(rc);
658 }
659
660 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
661 {
662         int rc = 0;
663         ENTRY;
664
665         if (set == NULL)
666                 RETURN(0);
667         LASSERT(set->set_exp);
668         if (set->set_completes)
669                 rc = create_done(set->set_exp, set, lsmp);
670
671         if (atomic_dec_and_test(&set->set_refcount))
672                 lov_finish_set(set);
673
674         RETURN(rc);
675 }
676
677 int lov_update_create_set(struct lov_request_set *set,
678                           struct lov_request *req, int rc)
679 {
680         struct obd_trans_info *oti = set->set_oti;
681         struct lov_stripe_md *lsm = set->set_oi->oi_md;
682         struct lov_oinfo *loi;
683         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
684         ENTRY;
685
686         req->rq_stripe = set->set_success;
687         loi = lsm->lsm_oinfo[req->rq_stripe];
688
689         if (rc && lov->lov_tgts[req->rq_idx] &&
690             lov->lov_tgts[req->rq_idx]->ltd_active) {
691                 CERROR("error creating fid "LPX64" sub-object"
692                        " on OST idx %d/%d: rc = %d\n",
693                        set->set_oi->oi_oa->o_id, req->rq_idx,
694                        lsm->lsm_stripe_count, rc);
695                 if (rc > 0) {
696                         CERROR("obd_create returned invalid err %d\n", rc);
697                         rc = -EIO;
698                 }
699         }
700         lov_update_set(set, req, rc);
701         if (rc)
702                 RETURN(rc);
703
704         loi->loi_id = req->rq_oi.oi_oa->o_id;
705         loi->loi_ost_idx = req->rq_idx;
706         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
707                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
708         loi_init(loi);
709
710         if (oti && set->set_cookies)
711                 ++oti->oti_logcookies;
712         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
713                 set->set_cookie_sent++;
714
715         RETURN(0);
716 }
717
718 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
719                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
720                         struct obd_trans_info *oti,
721                         struct lov_request_set **reqset)
722 {
723         struct lov_request_set *set;
724         int rc = 0;
725         ENTRY;
726
727         OBD_ALLOC(set, sizeof(*set));
728         if (set == NULL)
729                 RETURN(-ENOMEM);
730         lov_init_set(set);
731
732         set->set_exp = exp;
733         set->set_oi = oinfo;
734         set->set_oi->oi_md = *lsmp;
735         set->set_oi->oi_oa = src_oa;
736         set->set_oti = oti;
737
738         rc = qos_prep_create(exp, set);
739         if (rc)
740                 lov_fini_create_set(set, lsmp);
741         else
742                 *reqset = set;
743         RETURN(rc);
744 }
745
746 static int common_attr_done(struct lov_request_set *set)
747 {
748         struct list_head *pos;
749         struct lov_request *req;
750         struct obdo *tmp_oa;
751         int rc = 0, attrset = 0;
752         ENTRY;
753
754         LASSERT(set->set_oi != NULL);
755
756         if (set->set_oi->oi_oa == NULL)
757                 RETURN(0);
758
759         if (!set->set_success)
760                 RETURN(-EIO);
761
762         OBDO_ALLOC(tmp_oa);
763         if (tmp_oa == NULL)
764                 GOTO(out, rc = -ENOMEM);
765
766         list_for_each (pos, &set->set_list) {
767                 req = list_entry(pos, struct lov_request, rq_link);
768
769                 if (!req->rq_complete || req->rq_rc)
770                         continue;
771                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
772                         continue;
773                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
774                                 req->rq_oi.oi_oa->o_valid,
775                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
776         }
777         if (!attrset) {
778                 CERROR("No stripes had valid attrs\n");
779                 rc = -EIO;
780         }
781         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
782         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
783 out:
784         if (tmp_oa)
785                 OBDO_FREE(tmp_oa);
786         RETURN(rc);
787
788 }
789
790 static int brw_done(struct lov_request_set *set)
791 {
792         struct lov_stripe_md *lsm = set->set_oi->oi_md;
793         struct lov_oinfo     *loi = NULL;
794         struct list_head *pos;
795         struct lov_request *req;
796         ENTRY;
797
798         list_for_each (pos, &set->set_list) {
799                 req = list_entry(pos, struct lov_request, rq_link);
800
801                 if (!req->rq_complete || req->rq_rc)
802                         continue;
803
804                 loi = lsm->lsm_oinfo[req->rq_stripe];
805
806                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
807                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
808         }
809
810         RETURN(0);
811 }
812
813 int lov_fini_brw_set(struct lov_request_set *set)
814 {
815         int rc = 0;
816         ENTRY;
817
818         if (set == NULL)
819                 RETURN(0);
820         LASSERT(set->set_exp);
821         if (set->set_completes) {
822                 rc = brw_done(set);
823                 /* FIXME update qos data here */
824         }
825         if (atomic_dec_and_test(&set->set_refcount))
826                 lov_finish_set(set);
827
828         RETURN(rc);
829 }
830
831 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
832                      obd_count oa_bufs, struct brw_page *pga,
833                      struct obd_trans_info *oti,
834                      struct lov_request_set **reqset)
835 {
836         struct {
837                 obd_count       index;
838                 obd_count       count;
839                 obd_count       off;
840         } *info = NULL;
841         struct lov_request_set *set;
842         struct lov_oinfo *loi = NULL;
843         struct lov_obd *lov = &exp->exp_obd->u.lov;
844         int rc = 0, i, shift;
845         ENTRY;
846
847         OBD_ALLOC(set, sizeof(*set));
848         if (set == NULL)
849                 RETURN(-ENOMEM);
850         lov_init_set(set);
851
852         set->set_exp = exp;
853         set->set_oti = oti;
854         set->set_oi = oinfo;
855         set->set_oabufs = oa_bufs;
856         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
857         if (!set->set_pga)
858                 GOTO(out, rc = -ENOMEM);
859
860         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
861         if (!info)
862                 GOTO(out, rc = -ENOMEM);
863
864         /* calculate the page count for each stripe */
865         for (i = 0; i < oa_bufs; i++) {
866                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
867                 info[stripe].count++;
868         }
869
870         /* alloc and initialize lov request */
871         shift = 0;
872         for (i = 0 ; i < oinfo->oi_md->lsm_stripe_count; i++){
873                 struct lov_request *req;
874
875                 if (info[i].count == 0)
876                         continue;
877
878                 loi = oinfo->oi_md->lsm_oinfo[i];
879                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
880                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
881                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
882                         GOTO(out, rc = -EIO);
883                 }
884
885                 OBD_ALLOC(req, sizeof(*req));
886                 if (req == NULL)
887                         GOTO(out, rc = -ENOMEM);
888
889                 OBDO_ALLOC(req->rq_oi.oi_oa);
890                 if (req->rq_oi.oi_oa == NULL) {
891                         OBD_FREE(req, sizeof(*req));
892                         GOTO(out, rc = -ENOMEM);
893                 }
894
895                 if (oinfo->oi_oa) {
896                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
897                                sizeof(*req->rq_oi.oi_oa));
898                 }
899                 req->rq_oi.oi_oa->o_id = loi->loi_id;
900                 req->rq_oi.oi_oa->o_stripe_idx = i;
901
902                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
903                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
904                 if (req->rq_oi.oi_md == NULL) {
905                         OBDO_FREE(req->rq_oi.oi_oa);
906                         OBD_FREE(req, sizeof(*req));
907                         GOTO(out, rc = -ENOMEM);
908                 }
909
910                 req->rq_idx = loi->loi_ost_idx;
911                 req->rq_stripe = i;
912
913                 /* XXX LOV STACKING */
914                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
915                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
916                 req->rq_oabufs = info[i].count;
917                 req->rq_pgaidx = shift;
918                 shift += req->rq_oabufs;
919
920                 /* remember the index for sort brw_page array */
921                 info[i].index = req->rq_pgaidx;
922
923                 lov_set_add_req(req, set);
924         }
925         if (!set->set_count)
926                 GOTO(out, rc = -EIO);
927
928         /* rotate & sort the brw_page array */
929         for (i = 0; i < oa_bufs; i++) {
930                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
931
932                 shift = info[stripe].index + info[stripe].off;
933                 LASSERT(shift < oa_bufs);
934                 set->set_pga[shift] = pga[i];
935                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
936                                   &set->set_pga[shift].off);
937                 info[stripe].off++;
938         }
939 out:
940         if (info)
941                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
942
943         if (rc == 0)
944                 *reqset = set;
945         else
946                 lov_fini_brw_set(set);
947
948         RETURN(rc);
949 }
950
951 int lov_fini_getattr_set(struct lov_request_set *set)
952 {
953         int rc = 0;
954         ENTRY;
955
956         if (set == NULL)
957                 RETURN(0);
958         LASSERT(set->set_exp);
959         if (set->set_completes)
960                 rc = common_attr_done(set);
961
962         if (atomic_dec_and_test(&set->set_refcount))
963                 lov_finish_set(set);
964
965         RETURN(rc);
966 }
967
968 /* The callback for osc_getattr_async that finilizes a request info when a
969  * response is recieved. */
970 static int cb_getattr_update(struct obd_info *oinfo, int rc)
971 {
972         struct lov_request *lovreq;
973         lovreq = container_of(oinfo, struct lov_request, rq_oi);
974         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
975 }
976
977 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
978                          struct lov_request_set **reqset)
979 {
980         struct lov_request_set *set;
981         struct lov_oinfo *loi = NULL;
982         struct lov_obd *lov = &exp->exp_obd->u.lov;
983         int rc = 0, i;
984         ENTRY;
985
986         OBD_ALLOC(set, sizeof(*set));
987         if (set == NULL)
988                 RETURN(-ENOMEM);
989         lov_init_set(set);
990
991         set->set_exp = exp;
992         set->set_oi = oinfo;
993
994         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
995                 struct lov_request *req;
996
997                 loi = oinfo->oi_md->lsm_oinfo[i];
998                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
999                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1000                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1001                         continue;
1002                 }
1003
1004                 OBD_ALLOC(req, sizeof(*req));
1005                 if (req == NULL)
1006                         GOTO(out_set, rc = -ENOMEM);
1007
1008                 req->rq_stripe = i;
1009                 req->rq_idx = loi->loi_ost_idx;
1010
1011                 OBDO_ALLOC(req->rq_oi.oi_oa);
1012                 if (req->rq_oi.oi_oa == NULL) {
1013                         OBD_FREE(req, sizeof(*req));
1014                         GOTO(out_set, rc = -ENOMEM);
1015                 }
1016                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1017                        sizeof(*req->rq_oi.oi_oa));
1018                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1019                 req->rq_oi.oi_cb_up = cb_getattr_update;
1020                 req->rq_rqset = set;
1021
1022                 lov_set_add_req(req, set);
1023         }
1024         if (!set->set_count)
1025                 GOTO(out_set, rc = -EIO);
1026         *reqset = set;
1027         RETURN(rc);
1028 out_set:
1029         lov_fini_getattr_set(set);
1030         RETURN(rc);
1031 }
1032
1033 int lov_fini_destroy_set(struct lov_request_set *set)
1034 {
1035         ENTRY;
1036
1037         if (set == NULL)
1038                 RETURN(0);
1039         LASSERT(set->set_exp);
1040         if (set->set_completes) {
1041                 /* FIXME update qos data here */
1042         }
1043
1044         if (atomic_dec_and_test(&set->set_refcount))
1045                 lov_finish_set(set);
1046
1047         RETURN(0);
1048 }
1049
1050 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1051                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1052                          struct obd_trans_info *oti,
1053                          struct lov_request_set **reqset)
1054 {
1055         struct lov_request_set *set;
1056         struct lov_oinfo *loi = NULL;
1057         struct lov_obd *lov = &exp->exp_obd->u.lov;
1058         int rc = 0, i;
1059         ENTRY;
1060
1061         OBD_ALLOC(set, sizeof(*set));
1062         if (set == NULL)
1063                 RETURN(-ENOMEM);
1064         lov_init_set(set);
1065
1066         set->set_exp = exp;
1067         set->set_oi = oinfo;
1068         set->set_oi->oi_md = lsm;
1069         set->set_oi->oi_oa = src_oa;
1070         set->set_oti = oti;
1071         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1072                 set->set_cookies = oti->oti_logcookies;
1073
1074         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1075                 struct lov_request *req;
1076
1077                 loi = lsm->lsm_oinfo[i];
1078                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
1079                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1080                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1081                         continue;
1082                 }
1083
1084                 OBD_ALLOC(req, sizeof(*req));
1085                 if (req == NULL)
1086                         GOTO(out_set, rc = -ENOMEM);
1087
1088                 req->rq_stripe = i;
1089                 req->rq_idx = loi->loi_ost_idx;
1090
1091                 OBDO_ALLOC(req->rq_oi.oi_oa);
1092                 if (req->rq_oi.oi_oa == NULL) {
1093                         OBD_FREE(req, sizeof(*req));
1094                         GOTO(out_set, rc = -ENOMEM);
1095                 }
1096                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1097                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1098                 lov_set_add_req(req, set);
1099         }
1100         if (!set->set_count)
1101                 GOTO(out_set, rc = -EIO);
1102         *reqset = set;
1103         RETURN(rc);
1104 out_set:
1105         lov_fini_destroy_set(set);
1106         RETURN(rc);
1107 }
1108
1109 int lov_fini_setattr_set(struct lov_request_set *set)
1110 {
1111         int rc = 0;
1112         ENTRY;
1113
1114         if (set == NULL)
1115                 RETURN(0);
1116         LASSERT(set->set_exp);
1117         if (set->set_completes) {
1118                 rc = common_attr_done(set);
1119                 /* FIXME update qos data here */
1120         }
1121
1122         if (atomic_dec_and_test(&set->set_refcount))
1123                 lov_finish_set(set);
1124         RETURN(rc);
1125 }
1126
1127 int lov_update_setattr_set(struct lov_request_set *set,
1128                            struct lov_request *req, int rc)
1129 {
1130         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1131         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1132         ENTRY;
1133
1134         lov_update_set(set, req, rc);
1135
1136         /* grace error on inactive ost */
1137         if (rc && !(lov->lov_tgts[req->rq_idx] && 
1138                     lov->lov_tgts[req->rq_idx]->ltd_active))
1139                 rc = 0;
1140
1141         if (rc == 0) {
1142                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1143                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1144                                 req->rq_oi.oi_oa->o_ctime;
1145                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1146                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1147                                 req->rq_oi.oi_oa->o_mtime;
1148                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1149                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1150                                 req->rq_oi.oi_oa->o_atime;
1151         }
1152
1153         RETURN(rc);
1154 }
1155
1156 /* The callback for osc_setattr_async that finilizes a request info when a
1157  * response is recieved. */
1158 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1159 {
1160         struct lov_request *lovreq;
1161         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1162         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1163 }
1164
1165 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1166                          struct obd_trans_info *oti,
1167                          struct lov_request_set **reqset)
1168 {
1169         struct lov_request_set *set;
1170         struct lov_oinfo *loi = NULL;
1171         struct lov_obd *lov = &exp->exp_obd->u.lov;
1172         int rc = 0, i;
1173         ENTRY;
1174
1175         OBD_ALLOC(set, sizeof(*set));
1176         if (set == NULL)
1177                 RETURN(-ENOMEM);
1178         lov_init_set(set);
1179
1180         set->set_exp = exp;
1181         set->set_oti = oti;
1182         set->set_oi = oinfo;
1183         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1184                 set->set_cookies = oti->oti_logcookies;
1185
1186         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1187                 struct lov_request *req;
1188
1189                 loi = oinfo->oi_md->lsm_oinfo[i];
1190                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1191                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1192                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1193                         continue;
1194                 }
1195
1196                 OBD_ALLOC(req, sizeof(*req));
1197                 if (req == NULL)
1198                         GOTO(out_set, rc = -ENOMEM);
1199                 req->rq_stripe = i;
1200                 req->rq_idx = loi->loi_ost_idx;
1201
1202                 OBDO_ALLOC(req->rq_oi.oi_oa);
1203                 if (req->rq_oi.oi_oa == NULL) {
1204                         OBD_FREE(req, sizeof(*req));
1205                         GOTO(out_set, rc = -ENOMEM);
1206                 }
1207                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1208                        sizeof(*req->rq_oi.oi_oa));
1209                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1210                 req->rq_oi.oi_oa->o_stripe_idx = i;
1211                 req->rq_oi.oi_cb_up = cb_setattr_update;
1212                 req->rq_rqset = set;
1213
1214                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1215                         int off = lov_stripe_offset(oinfo->oi_md,
1216                                                     oinfo->oi_oa->o_size, i,
1217                                                     &req->rq_oi.oi_oa->o_size);
1218
1219                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1220                                 req->rq_oi.oi_oa->o_size--;
1221
1222                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1223                                i, req->rq_oi.oi_oa->o_size,
1224                                oinfo->oi_oa->o_size);
1225                 }
1226                 lov_set_add_req(req, set);
1227         }
1228         if (!set->set_count)
1229                 GOTO(out_set, rc = -EIO);
1230         *reqset = set;
1231         RETURN(rc);
1232 out_set:
1233         lov_fini_setattr_set(set);
1234         RETURN(rc);
1235 }
1236
1237 int lov_fini_punch_set(struct lov_request_set *set)
1238 {
1239         int rc = 0;
1240         ENTRY;
1241
1242         if (set == NULL)
1243                 RETURN(0);
1244         LASSERT(set->set_exp);
1245         if (set->set_completes) {
1246                 rc = -EIO;
1247                 /* FIXME update qos data here */
1248                 if (set->set_success)
1249                         rc = common_attr_done(set);
1250         }
1251
1252         if (atomic_dec_and_test(&set->set_refcount))
1253                 lov_finish_set(set);
1254
1255         RETURN(rc);
1256 }
1257
1258 int lov_update_punch_set(struct lov_request_set *set,
1259                            struct lov_request *req, int rc)
1260 {
1261         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1262         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1263         ENTRY;
1264
1265         lov_update_set(set, req, rc);
1266
1267         /* grace error on inactive ost */
1268         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1269                 rc = 0;
1270
1271         if (rc == 0) {
1272                 lov_stripe_lock(lsm);
1273                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1274                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1275                                 req->rq_oi.oi_oa->o_blocks;
1276                 }
1277
1278                 /* Do we need to update lvb_size here? It needn't because
1279                  * it have been done in ll_truncate(). -jay */
1280                 lov_stripe_unlock(lsm);
1281         }
1282
1283         RETURN(rc);
1284 }
1285
1286 /* The callback for osc_punch that finilizes a request info when a response
1287  * is recieved. */
1288 static int cb_update_punch(struct obd_info *oinfo, int rc)
1289 {
1290         struct lov_request *lovreq;
1291         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1292         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1293 }
1294
1295 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1296                        struct obd_trans_info *oti,
1297                        struct lov_request_set **reqset)
1298 {
1299         struct lov_request_set *set;
1300         struct lov_oinfo *loi = NULL;
1301         struct lov_obd *lov = &exp->exp_obd->u.lov;
1302         int rc = 0, i;
1303         ENTRY;
1304
1305         OBD_ALLOC(set, sizeof(*set));
1306         if (set == NULL)
1307                 RETURN(-ENOMEM);
1308         lov_init_set(set);
1309
1310         set->set_oi = oinfo;
1311         set->set_exp = exp;
1312
1313         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1314                 struct lov_request *req;
1315                 obd_off rs, re;
1316
1317                 loi = oinfo->oi_md->lsm_oinfo[i];
1318                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1319                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1320                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1321                         continue;
1322                 }
1323
1324                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1325                                            oinfo->oi_policy.l_extent.start,
1326                                            oinfo->oi_policy.l_extent.end,
1327                                            &rs, &re))
1328                         continue;
1329
1330                 OBD_ALLOC(req, sizeof(*req));
1331                 if (req == NULL)
1332                         GOTO(out_set, rc = -ENOMEM);
1333                 req->rq_stripe = i;
1334                 req->rq_idx = loi->loi_ost_idx;
1335
1336                 OBDO_ALLOC(req->rq_oi.oi_oa);
1337                 if (req->rq_oi.oi_oa == NULL) {
1338                         OBD_FREE(req, sizeof(*req));
1339                         GOTO(out_set, rc = -ENOMEM);
1340                 }
1341                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1342                        sizeof(*req->rq_oi.oi_oa));
1343                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1344                 req->rq_oi.oi_oa->o_stripe_idx = i;
1345                 req->rq_oi.oi_cb_up = cb_update_punch;
1346                 req->rq_rqset = set;
1347
1348                 req->rq_oi.oi_policy.l_extent.start = rs;
1349                 req->rq_oi.oi_policy.l_extent.end = re;
1350                 req->rq_oi.oi_policy.l_extent.gid = -1;
1351
1352                 lov_set_add_req(req, set);
1353         }
1354         if (!set->set_count)
1355                 GOTO(out_set, rc = -EIO);
1356         *reqset = set;
1357         RETURN(rc);
1358 out_set:
1359         lov_fini_punch_set(set);
1360         RETURN(rc);
1361 }
1362
1363 int lov_fini_sync_set(struct lov_request_set *set)
1364 {
1365         int rc = 0;
1366         ENTRY;
1367
1368         if (set == NULL)
1369                 RETURN(0);
1370         LASSERT(set->set_exp);
1371         if (set->set_completes) {
1372                 if (!set->set_success)
1373                         rc = -EIO;
1374                 /* FIXME update qos data here */
1375         }
1376
1377         if (atomic_dec_and_test(&set->set_refcount))
1378                 lov_finish_set(set);
1379
1380         RETURN(rc);
1381 }
1382
1383 /* The callback for osc_sync that finilizes a request info when a
1384  * response is recieved. */
1385 static int cb_sync_update(struct obd_info *oinfo, int rc)
1386 {
1387         struct lov_request *lovreq;
1388         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1389         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1390 }
1391
1392 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1393                       obd_off start, obd_off end,
1394                       struct lov_request_set **reqset)
1395 {
1396         struct lov_request_set *set;
1397         struct lov_oinfo *loi = NULL;
1398         struct lov_obd *lov = &exp->exp_obd->u.lov;
1399         int rc = 0, i;
1400         ENTRY;
1401
1402         OBD_ALLOC(set, sizeof(*set));
1403         if (set == NULL)
1404                 RETURN(-ENOMEM);
1405         lov_init_set(set);
1406
1407         set->set_exp = exp;
1408         set->set_oi = oinfo;
1409         set->set_oi->oi_md = oinfo->oi_md;
1410         set->set_oi->oi_oa = oinfo->oi_oa;
1411
1412         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1413                 struct lov_request *req;
1414                 obd_off rs, re;
1415
1416                 loi = oinfo->oi_md->lsm_oinfo[i];
1417                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1418                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1419                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1420                         continue;
1421                 }
1422
1423                 if (!lov_stripe_intersects(oinfo->oi_md, i, start,
1424                                            end, &rs, &re))
1425                         continue;
1426
1427                 OBD_ALLOC(req, sizeof(*req));
1428                 if (req == NULL)
1429                         GOTO(out_set, rc = -ENOMEM);
1430                 req->rq_stripe = i;
1431                 req->rq_idx = loi->loi_ost_idx;
1432
1433                 OBDO_ALLOC(req->rq_oi.oi_oa);
1434                 if (req->rq_oi.oi_oa == NULL) {
1435                         OBD_FREE(req, sizeof(*req));
1436                         GOTO(out_set, rc = -ENOMEM);
1437                 }
1438                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1439                        sizeof(*req->rq_oi.oi_oa));
1440                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1441                 req->rq_oi.oi_oa->o_stripe_idx = i;
1442
1443                 req->rq_oi.oi_policy.l_extent.start = rs;
1444                 req->rq_oi.oi_policy.l_extent.end = re;
1445                 req->rq_oi.oi_policy.l_extent.gid = -1;
1446                 req->rq_oi.oi_cb_up = cb_sync_update;
1447                 req->rq_rqset = set;
1448
1449                 lov_set_add_req(req, set);
1450         }
1451         if (!set->set_count)
1452                 GOTO(out_set, rc = -EIO);
1453         *reqset = set;
1454         RETURN(rc);
1455 out_set:
1456         lov_fini_sync_set(set);
1457         RETURN(rc);
1458 }
1459
1460 #define LOV_U64_MAX ((__u64)~0ULL)
1461 #define LOV_SUM_MAX(tot, add)                                           \
1462         do {                                                            \
1463                 if ((tot) + (add) < (tot))                              \
1464                         (tot) = LOV_U64_MAX;                            \
1465                 else                                                    \
1466                         (tot) += (add);                                 \
1467         } while(0)
1468
1469 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1470 {
1471         ENTRY;
1472
1473         if (success) {
1474                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1475
1476                 if (osfs->os_files != LOV_U64_MAX)
1477                         do_div(osfs->os_files, expected_stripes);
1478                 if (osfs->os_ffree != LOV_U64_MAX)
1479                         do_div(osfs->os_ffree, expected_stripes);
1480
1481                 spin_lock(&obd->obd_osfs_lock);
1482                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1483                 obd->obd_osfs_age = cfs_time_current_64();
1484                 spin_unlock(&obd->obd_osfs_lock);
1485                 RETURN(0);
1486         }
1487
1488         RETURN(-EIO);
1489 }
1490
1491 int lov_fini_statfs_set(struct lov_request_set *set)
1492 {
1493         int rc = 0;
1494         ENTRY;
1495
1496         if (set == NULL)
1497                 RETURN(0);
1498
1499         if (set->set_completes) {
1500                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1501                                      set->set_success);
1502         }
1503
1504         if (atomic_dec_and_test(&set->set_refcount))
1505                 lov_finish_set(set);
1506
1507         RETURN(rc);
1508 }
1509
1510 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1511                        int success)
1512 {
1513         int shift = 0, quit = 0;
1514         __u64 tmp;
1515
1516         if (success == 0) {
1517                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1518         } else {
1519                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1520                         /* assume all block sizes are always powers of 2 */
1521                         /* get the bits difference */
1522                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1523                         for (shift = 0; shift <= 64; ++shift) {
1524                                 if (tmp & 1) {
1525                                         if (quit)
1526                                                 break;
1527                                         else
1528                                                 quit = 1;
1529                                         shift = 0;
1530                                 }
1531                                 tmp >>= 1;
1532                         }
1533                 }
1534
1535                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1536                         osfs->os_bsize = lov_sfs->os_bsize;
1537
1538                         osfs->os_bfree  >>= shift;
1539                         osfs->os_bavail >>= shift;
1540                         osfs->os_blocks >>= shift;
1541                 } else if (shift != 0) {
1542                         lov_sfs->os_bfree  >>= shift;
1543                         lov_sfs->os_bavail >>= shift;
1544                         lov_sfs->os_blocks >>= shift;
1545                 }
1546 #ifdef MIN_DF
1547                 /* Sandia requested that df (and so, statfs) only
1548                    returned minimal available space on
1549                    a single OST, so people would be able to
1550                    write this much data guaranteed. */
1551                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1552                         /* Presumably if new bavail is smaller,
1553                            new bfree is bigger as well */
1554                         osfs->os_bfree = lov_sfs->os_bfree;
1555                         osfs->os_bavail = lov_sfs->os_bavail;
1556                 }
1557 #else
1558                 osfs->os_bfree += lov_sfs->os_bfree;
1559                 osfs->os_bavail += lov_sfs->os_bavail;
1560 #endif
1561                 osfs->os_blocks += lov_sfs->os_blocks;
1562                 /* XXX not sure about this one - depends on policy.
1563                  *   - could be minimum if we always stripe on all OBDs
1564                  *     (but that would be wrong for any other policy,
1565                  *     if one of the OBDs has no more objects left)
1566                  *   - could be sum if we stripe whole objects
1567                  *   - could be average, just to give a nice number
1568                  *
1569                  * To give a "reasonable" (if not wholly accurate)
1570                  * number, we divide the total number of free objects
1571                  * by expected stripe count (watch out for overflow).
1572                  */
1573                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1574                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1575         }
1576 }
1577
1578 /* The callback for osc_statfs_async that finilizes a request info when a
1579  * response is recieved. */
1580 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1581 {
1582         struct lov_request *lovreq;
1583         struct obd_statfs *osfs, *lov_sfs;
1584         struct obd_device *obd;
1585         struct lov_obd *lov;
1586         int success;
1587         ENTRY;
1588
1589         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1590         lov = &lovreq->rq_rqset->set_obd->u.lov;
1591         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1592
1593         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1594         lov_sfs = oinfo->oi_osfs;
1595
1596         success = lovreq->rq_rqset->set_success;
1597
1598         /* XXX: the same is done in lov_update_common_set, however
1599            lovset->set_exp is not initialized. */
1600         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1601         if (rc) {
1602                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1603                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1604                         rc = 0;
1605                 RETURN(rc);
1606         }
1607
1608         spin_lock(&obd->obd_osfs_lock);
1609         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1610         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1611                 obd->obd_osfs_age = cfs_time_current_64();
1612         spin_unlock(&obd->obd_osfs_lock);
1613
1614         lov_update_statfs(osfs, lov_sfs, success);
1615         qos_update(lov);
1616
1617         RETURN(0);
1618 }
1619
1620 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1621                         struct lov_request_set **reqset)
1622 {
1623         struct lov_request_set *set;
1624         struct lov_obd *lov = &obd->u.lov;
1625         int rc = 0, i;
1626         ENTRY;
1627
1628         OBD_ALLOC(set, sizeof(*set));
1629         if (set == NULL)
1630                 RETURN(-ENOMEM);
1631         lov_init_set(set);
1632
1633         set->set_obd = obd;
1634         set->set_oi = oinfo;
1635
1636         /* We only get block data from the OBD */
1637         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1638                 struct lov_request *req;
1639
1640                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1641                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1642                         continue;
1643                 }
1644
1645                 OBD_ALLOC(req, sizeof(*req));
1646                 if (req == NULL)
1647                         GOTO(out_set, rc = -ENOMEM);
1648
1649                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1650                 if (req->rq_oi.oi_osfs == NULL) {
1651                         OBD_FREE(req, sizeof(*req));
1652                         GOTO(out_set, rc = -ENOMEM);
1653                 }
1654
1655                 req->rq_idx = i;
1656                 req->rq_oi.oi_cb_up = cb_statfs_update;
1657                 req->rq_oi.oi_flags = oinfo->oi_flags;
1658                 req->rq_rqset = set;
1659
1660                 lov_set_add_req(req, set);
1661         }
1662         if (!set->set_count)
1663                 GOTO(out_set, rc = -EIO);
1664         *reqset = set;
1665         RETURN(rc);
1666 out_set:
1667         lov_fini_statfs_set(set);
1668         RETURN(rc);
1669 }