Whamcloud - gitweb
- update from 1_5
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29
30 #ifdef __KERNEL__
31 #include <libcfs/libcfs.h>
32 #else
33 #include <liblustre.h>
34 #endif
35
36 #include <obd_class.h>
37 #include <obd_lov.h>
38 #include <lustre/lustre_idl.h>
39
40 #include "lov_internal.h"
41
42 static void lov_init_set(struct lov_request_set *set)
43 {
44         set->set_count = 0;
45         set->set_completes = 0;
46         set->set_success = 0;
47         set->set_cookies = 0;
48         CFS_INIT_LIST_HEAD(&set->set_list);
49         atomic_set(&set->set_refcount, 1);
50 }
51
52 static void lov_finish_set(struct lov_request_set *set)
53 {
54         struct list_head *pos, *n;
55         ENTRY;
56
57         LASSERT(set);
58         list_for_each_safe(pos, n, &set->set_list) {
59                 struct lov_request *req = list_entry(pos, struct lov_request,
60                                                      rq_link);
61                 list_del_init(&req->rq_link);
62
63                 if (req->rq_oi.oi_oa)
64                         obdo_free(req->rq_oi.oi_oa);
65                 if (req->rq_oi.oi_md)
66                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
67                 if (req->rq_oi.oi_osfs)
68                         OBD_FREE(req->rq_oi.oi_osfs,
69                                  sizeof(*req->rq_oi.oi_osfs));
70                 OBD_FREE(req, sizeof(*req));
71         }
72
73         if (set->set_pga) {
74                 int len = set->set_oabufs * sizeof(*set->set_pga);
75                 OBD_FREE(set->set_pga, len);
76         }
77         if (set->set_lockh)
78                 lov_llh_put(set->set_lockh);
79
80         OBD_FREE(set, sizeof(*set));
81         EXIT;
82 }
83
84 void lov_update_set(struct lov_request_set *set,
85                     struct lov_request *req, int rc)
86 {
87         req->rq_complete = 1;
88         req->rq_rc = rc;
89
90         set->set_completes++;
91         if (rc == 0)
92                 set->set_success++;
93 }
94
95 int lov_update_common_set(struct lov_request_set *set,
96                           struct lov_request *req, int rc)
97 {
98         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
99         ENTRY;
100
101         lov_update_set(set, req, rc);
102
103         /* grace error on inactive ost */
104         if (rc && !(lov->lov_tgts[req->rq_idx] && 
105                     lov->lov_tgts[req->rq_idx]->ltd_active))
106                 rc = 0;
107
108         /* FIXME in raid1 regime, should return 0 */
109         RETURN(rc);
110 }
111
112 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
113 {
114         list_add_tail(&req->rq_link, &set->set_list);
115         set->set_count++;
116 }
117
118 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
119 {
120         struct lov_request_set *set = req->rq_rqset;
121         struct lustre_handle *lov_lockhp;
122         struct lov_oinfo *loi;
123         ENTRY;
124
125         LASSERT(set != NULL);
126         LASSERT(set->set_oi != NULL);
127
128         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
129         loi = &set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
130
131         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
132          * and that copy can be arbitrarily out of date.
133          *
134          * The LOV API is due for a serious rewriting anyways, and this
135          * can be addressed then. */
136
137         if (rc == ELDLM_OK) {
138                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
139                 __u64 tmp;
140
141                 LASSERT(lock != NULL);
142                 lov_stripe_lock(set->set_oi->oi_md);
143                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo->loi_lvb;
144                 tmp = loi->loi_lvb.lvb_size;
145                 /* Extend KMS up to the end of this lock and no further
146                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
147                 if (tmp > lock->l_policy_data.l_extent.end)
148                         tmp = lock->l_policy_data.l_extent.end + 1;
149                 if (tmp >= loi->loi_kms) {
150                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
151                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
152                         loi->loi_kms = tmp;
153                         loi->loi_kms_valid = 1;
154                 } else {
155                         LDLM_DEBUG(lock, "lock acquired, setting rss="
156                                    LPU64"; leaving kms="LPU64", end="LPU64,
157                                    loi->loi_lvb.lvb_size, loi->loi_kms,
158                                    lock->l_policy_data.l_extent.end);
159                 }
160                 lov_stripe_unlock(set->set_oi->oi_md);
161                 ldlm_lock_allow_match(lock);
162                 LDLM_LOCK_PUT(lock);
163         } else if ((rc == ELDLM_LOCK_ABORTED) &&
164                    (set->set_ei->ei_flags & LDLM_FL_HAS_INTENT)) {
165                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
166                 lov_stripe_lock(set->set_oi->oi_md);
167                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo->loi_lvb;
168                 lov_stripe_unlock(set->set_oi->oi_md);
169                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
170                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
171                 rc = ELDLM_OK;
172         } else {
173                 struct obd_export *exp = set->set_exp;
174                 struct lov_obd *lov = &exp->exp_obd->u.lov;
175
176                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
177                 if (lov->lov_tgts[req->rq_idx] && 
178                     lov->lov_tgts[req->rq_idx]->ltd_active) {
179                         CERROR("error: enqueue objid "LPX64" subobj "
180                                 LPX64" on OST idx %d: rc = %d\n",
181                                 set->set_oi->oi_md->lsm_object_id,
182                                 loi->loi_id, loi->loi_ost_idx, rc);
183                 } else {
184                         rc = ELDLM_OK;
185                 }
186         }
187         lov_update_set(set, req, rc);
188         RETURN(rc);
189 }
190
191 /* The callback for osc_enqueue that updates lov info for every OSC request. */
192 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
193 {
194         struct obd_enqueue_info *einfo;
195         struct lov_request *lovreq;
196
197         lovreq = container_of(oinfo, struct lov_request, rq_oi);
198         einfo = lovreq->rq_rqset->set_ei;
199         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
200 }
201
202 static int enqueue_done(struct lov_request_set *set, __u32 mode)
203 {
204         struct lov_request *req;
205         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
206         int rc = 0;
207         ENTRY;
208
209         LASSERT(set->set_completes);
210         /* enqueue/match success, just return */
211         if (set->set_completes == set->set_success)
212                 RETURN(0);
213
214         /* cancel enqueued/matched locks */
215         list_for_each_entry(req, &set->set_list, rq_link) {
216                 struct lustre_handle *lov_lockhp;
217
218                 if (!req->rq_complete || req->rq_rc)
219                         continue;
220
221                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
222                 LASSERT(lov_lockhp);
223                 if (!lustre_handle_is_used(lov_lockhp))
224                         continue;
225
226                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
227                                 req->rq_oi.oi_md, mode, lov_lockhp);
228                 if (rc && lov->lov_tgts[req->rq_idx] &&
229                     lov->lov_tgts[req->rq_idx]->ltd_active)
230                         CERROR("cancelling obdjid "LPX64" on OST "
231                                "idx %d error: rc = %d\n",
232                                req->rq_oi.oi_md->lsm_object_id,
233                                req->rq_idx, rc);
234         }
235         lov_llh_put(set->set_lockh);
236         RETURN(rc);
237 }
238
239 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
240 {
241         int rc = 0;
242         ENTRY;
243
244         if (set == NULL)
245                 RETURN(0);
246         LASSERT(set->set_exp);
247         /* Do enqueue_done only for sync requests and if any request
248            succeeded. */
249         if (!set->set_ei->ei_rqset && set->set_completes)
250                 rc = enqueue_done(set, mode);
251         else
252                 lov_llh_put(set->set_lockh);
253
254         if (atomic_dec_and_test(&set->set_refcount))
255                 lov_finish_set(set);
256
257         RETURN(rc);
258 }
259
260 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
261                          struct obd_enqueue_info *einfo,
262                          struct lov_request_set **reqset)
263 {
264         struct lov_obd *lov = &exp->exp_obd->u.lov;
265         struct lov_request_set *set;
266         int i, rc = 0;
267         struct lov_oinfo *loi;
268         ENTRY;
269
270         OBD_ALLOC(set, sizeof(*set));
271         if (set == NULL)
272                 RETURN(-ENOMEM);
273         lov_init_set(set);
274
275         set->set_exp = exp;
276         set->set_oi = oinfo;
277         set->set_ei = einfo;
278         set->set_lockh = lov_llh_new(oinfo->oi_md);
279         if (set->set_lockh == NULL)
280                 GOTO(out_set, rc = -ENOMEM);
281         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
282
283         loi = oinfo->oi_md->lsm_oinfo;
284         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
285                 struct lov_request *req;
286                 obd_off start, end;
287
288                 if (!lov_stripe_intersects(oinfo->oi_md, i,
289                                            oinfo->oi_policy.l_extent.start,
290                                            oinfo->oi_policy.l_extent.end,
291                                            &start, &end))
292                         continue;
293
294                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
295                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
296                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
297                         continue;
298                 }
299
300                 OBD_ALLOC(req, sizeof(*req));
301                 if (req == NULL)
302                         GOTO(out_set, rc = -ENOMEM);
303
304                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
305                         sizeof(struct lov_oinfo);
306                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
307                 if (req->rq_oi.oi_md == NULL) {
308                         OBD_FREE(req, sizeof(*req));
309                         GOTO(out_set, rc = -ENOMEM);
310                 }
311
312                 req->rq_rqset = set;
313                 /* Set lov request specific parameters. */
314                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
315                 req->rq_oi.oi_cb_up = cb_update_enqueue;
316
317                 LASSERT(req->rq_oi.oi_lockh);
318
319                 req->rq_oi.oi_policy.l_extent.gid =
320                         oinfo->oi_policy.l_extent.gid;
321                 req->rq_oi.oi_policy.l_extent.start = start;
322                 req->rq_oi.oi_policy.l_extent.end = end;
323
324                 req->rq_idx = loi->loi_ost_idx;
325                 req->rq_stripe = i;
326
327                 /* XXX LOV STACKING: submd should be from the subobj */
328                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
329                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
330                 req->rq_oi.oi_md->lsm_stripe_count = 0;
331                 req->rq_oi.oi_md->lsm_oinfo->loi_kms_valid =
332                         loi->loi_kms_valid;
333                 req->rq_oi.oi_md->lsm_oinfo->loi_kms = loi->loi_kms;
334                 req->rq_oi.oi_md->lsm_oinfo->loi_lvb = loi->loi_lvb;
335
336                 lov_set_add_req(req, set);
337         }
338         if (!set->set_count)
339                 GOTO(out_set, rc = -EIO);
340         *reqset = set;
341         RETURN(0);
342 out_set:
343         lov_fini_enqueue_set(set, einfo->ei_mode);
344         RETURN(rc);
345 }
346
347 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
348                          int rc)
349 {
350         int ret = rc;
351         ENTRY;
352
353         if (rc == 1)
354                 ret = 0;
355         lov_update_set(set, req, ret);
356         RETURN(rc);
357 }
358
359 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
360 {
361         int rc = 0;
362         ENTRY;
363
364         if (set == NULL)
365                 RETURN(0);
366         LASSERT(set->set_exp);
367         if (set->set_completes) {
368                 if (set->set_count == set->set_success &&
369                     flags & LDLM_FL_TEST_LOCK)
370                         lov_llh_put(set->set_lockh);
371                 rc = enqueue_done(set, mode);
372         } else {
373                 lov_llh_put(set->set_lockh);
374         }
375
376         if (atomic_dec_and_test(&set->set_refcount))
377                 lov_finish_set(set);
378
379         RETURN(rc);
380 }
381
382 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
383                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
384                        __u32 mode, struct lustre_handle *lockh,
385                        struct lov_request_set **reqset)
386 {
387         struct lov_obd *lov = &exp->exp_obd->u.lov;
388         struct lov_request_set *set;
389         int i, rc = 0;
390         struct lov_oinfo *loi;
391         ENTRY;
392
393         OBD_ALLOC(set, sizeof(*set));
394         if (set == NULL)
395                 RETURN(-ENOMEM);
396         lov_init_set(set);
397
398         set->set_exp = exp;
399         set->set_oi = oinfo;
400         set->set_oi->oi_md = lsm;
401         set->set_lockh = lov_llh_new(lsm);
402         if (set->set_lockh == NULL)
403                 GOTO(out_set, rc = -ENOMEM);
404         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
405
406         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
407                 struct lov_request *req;
408                 obd_off start, end;
409
410                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
411                                            policy->l_extent.end, &start, &end))
412                         continue;
413
414                 /* FIXME raid1 should grace this error */
415                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
416                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
417                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
418                         GOTO(out_set, rc = -EIO);
419                 }
420
421                 OBD_ALLOC(req, sizeof(*req));
422                 if (req == NULL)
423                         GOTO(out_set, rc = -ENOMEM);
424
425                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
426                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
427                 if (req->rq_oi.oi_md == NULL) {
428                         OBD_FREE(req, sizeof(*req));
429                         GOTO(out_set, rc = -ENOMEM);
430                 }
431
432                 req->rq_oi.oi_policy.l_extent.start = start;
433                 req->rq_oi.oi_policy.l_extent.end = end;
434                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
435
436                 req->rq_idx = loi->loi_ost_idx;
437                 req->rq_stripe = i;
438
439                 /* XXX LOV STACKING: submd should be from the subobj */
440                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
441                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
442                 req->rq_oi.oi_md->lsm_stripe_count = 0;
443
444                 lov_set_add_req(req, set);
445         }
446         if (!set->set_count)
447                 GOTO(out_set, rc = -EIO);
448         *reqset = set;
449         RETURN(rc);
450 out_set:
451         lov_fini_match_set(set, mode, 0);
452         RETURN(rc);
453 }
454
455 int lov_fini_cancel_set(struct lov_request_set *set)
456 {
457         int rc = 0;
458         ENTRY;
459
460         if (set == NULL)
461                 RETURN(0);
462
463         LASSERT(set->set_exp);
464         if (set->set_lockh)
465                 lov_llh_put(set->set_lockh);
466
467         if (atomic_dec_and_test(&set->set_refcount))
468                 lov_finish_set(set);
469
470         RETURN(rc);
471 }
472
473 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
474                         struct lov_stripe_md *lsm, __u32 mode,
475                         struct lustre_handle *lockh,
476                         struct lov_request_set **reqset)
477 {
478         struct lov_request_set *set;
479         int i, rc = 0;
480         struct lov_oinfo *loi;
481         ENTRY;
482
483         OBD_ALLOC(set, sizeof(*set));
484         if (set == NULL)
485                 RETURN(-ENOMEM);
486         lov_init_set(set);
487
488         set->set_exp = exp;
489         set->set_oi = oinfo;
490         set->set_oi->oi_md = lsm;
491         set->set_lockh = lov_handle2llh(lockh);
492         if (set->set_lockh == NULL) {
493                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
494                 GOTO(out_set, rc = -EINVAL);
495         }
496         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
497
498         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
499                 struct lov_request *req;
500                 struct lustre_handle *lov_lockhp;
501
502                 lov_lockhp = set->set_lockh->llh_handles + i;
503                 if (!lustre_handle_is_used(lov_lockhp)) {
504                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
505                                loi->loi_ost_idx, loi->loi_id);
506                         continue;
507                 }
508
509                 OBD_ALLOC(req, sizeof(*req));
510                 if (req == NULL)
511                         GOTO(out_set, rc = -ENOMEM);
512
513                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
514                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
515                 if (req->rq_oi.oi_md == NULL) {
516                         OBD_FREE(req, sizeof(*req));
517                         GOTO(out_set, rc = -ENOMEM);
518                 }
519
520                 req->rq_idx = loi->loi_ost_idx;
521                 req->rq_stripe = i;
522
523                 /* XXX LOV STACKING: submd should be from the subobj */
524                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
525                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
526                 req->rq_oi.oi_md->lsm_stripe_count = 0;
527
528                 lov_set_add_req(req, set);
529         }
530         if (!set->set_count)
531                 GOTO(out_set, rc = -EIO);
532         *reqset = set;
533         RETURN(rc);
534 out_set:
535         lov_fini_cancel_set(set);
536         RETURN(rc);
537 }
538
539 static int create_done(struct obd_export *exp, struct lov_request_set *set,
540                        struct lov_stripe_md **lsmp)
541 {
542         struct lov_obd *lov = &exp->exp_obd->u.lov;
543         struct obd_trans_info *oti = set->set_oti;
544         struct obdo *src_oa = set->set_oi->oi_oa;
545         struct lov_request *req;
546         struct obdo *ret_oa = NULL;
547         int attrset = 0, rc = 0;
548         ENTRY;
549
550         LASSERT(set->set_completes);
551
552         /* try alloc objects on other osts if osc_create fails for
553          * exceptions: RPC failure, ENOSPC, etc */
554         if (set->set_count != set->set_success) {
555                 list_for_each_entry (req, &set->set_list, rq_link) {
556                         if (req->rq_rc == 0)
557                                 continue;
558
559                         set->set_completes--;
560                         req->rq_complete = 0;
561
562                         rc = qos_remedy_create(set, req);
563                         lov_update_create_set(set, req, rc);
564
565                         if (rc)
566                                 break;
567                 }
568         }
569
570         /* no successful creates */
571         if (set->set_success == 0)
572                 GOTO(cleanup, rc);
573
574         /* If there was an explicit stripe set, fail.  Otherwise, we
575          * got some objects and that's not bad. */
576         if (set->set_count != set->set_success) {
577                 if (*lsmp)
578                         GOTO(cleanup, rc);
579                 set->set_count = set->set_success;
580                 qos_shrink_lsm(set);
581         }
582
583         ret_oa = obdo_alloc();
584         if (ret_oa == NULL)
585                 GOTO(cleanup, rc = -ENOMEM);
586
587         list_for_each_entry(req, &set->set_list, rq_link) {
588                 if (!req->rq_complete || req->rq_rc)
589                         continue;
590                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
591                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
592                                 req->rq_stripe, &attrset);
593         }
594         if (src_oa->o_valid & OBD_MD_FLSIZE &&
595             ret_oa->o_size != src_oa->o_size) {
596                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
597                        src_oa->o_size, ret_oa->o_size);
598                 LBUG();
599         }
600         ret_oa->o_id = src_oa->o_id;
601         ret_oa->o_gr = src_oa->o_gr;
602         ret_oa->o_valid |= OBD_MD_FLGROUP;
603         memcpy(src_oa, ret_oa, sizeof(*src_oa));
604         obdo_free(ret_oa);
605
606         *lsmp = set->set_oi->oi_md;
607         GOTO(done, rc = 0);
608
609 cleanup:
610         list_for_each_entry(req, &set->set_list, rq_link) {
611                 struct obd_export *sub_exp;
612                 int err = 0;
613
614                 if (!req->rq_complete || req->rq_rc)
615                         continue;
616
617                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
618                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
619                 if (err)
620                         CERROR("Failed to uncreate objid "LPX64" subobj "
621                                LPX64" on OST idx %d: rc = %d\n",
622                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
623                                req->rq_idx, rc);
624         }
625         if (*lsmp == NULL)
626                 obd_free_memmd(exp, &set->set_oi->oi_md);
627 done:
628         if (oti && set->set_cookies) {
629                 oti->oti_logcookies = set->set_cookies;
630                 if (!set->set_cookie_sent) {
631                         oti_free_cookies(oti);
632                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
633                 } else {
634                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
635                 }
636         }
637         RETURN(rc);
638 }
639
640 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
641 {
642         int rc = 0;
643         ENTRY;
644
645         if (set == NULL)
646                 RETURN(0);
647         LASSERT(set->set_exp);
648         if (set->set_completes)
649                 rc = create_done(set->set_exp, set, lsmp);
650
651         if (atomic_dec_and_test(&set->set_refcount))
652                 lov_finish_set(set);
653
654         RETURN(rc);
655 }
656
657 int lov_update_create_set(struct lov_request_set *set,
658                           struct lov_request *req, int rc)
659 {
660         struct obd_trans_info *oti = set->set_oti;
661         struct lov_stripe_md *lsm = set->set_oi->oi_md;
662         struct lov_oinfo *loi;
663         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
664         ENTRY;
665
666         req->rq_stripe = set->set_success;
667         loi = &lsm->lsm_oinfo[req->rq_stripe];
668
669         if (rc && lov->lov_tgts[req->rq_idx] &&
670             lov->lov_tgts[req->rq_idx]->ltd_active) {
671                 CERROR("error creating fid "LPX64" sub-object"
672                        " on OST idx %d/%d: rc = %d\n",
673                        set->set_oi->oi_oa->o_id, req->rq_idx,
674                        lsm->lsm_stripe_count, rc);
675                 if (rc > 0) {
676                         CERROR("obd_create returned invalid err %d\n", rc);
677                         rc = -EIO;
678                 }
679         }
680         lov_update_set(set, req, rc);
681         if (rc)
682                 RETURN(rc);
683
684         if (oti && oti->oti_objid)
685                 oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id;
686
687         loi->loi_id = req->rq_oi.oi_oa->o_id;
688         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
689         loi->loi_ost_idx = req->rq_idx;
690         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
691                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
692         loi_init(loi);
693
694         if (oti && set->set_cookies)
695                 ++oti->oti_logcookies;
696         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
697                 set->set_cookie_sent++;
698
699         RETURN(0);
700 }
701
702 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
703                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
704                         struct obd_trans_info *oti,
705                         struct lov_request_set **reqset)
706 {
707         struct lov_request_set *set;
708         int rc = 0;
709         ENTRY;
710
711         OBD_ALLOC(set, sizeof(*set));
712         if (set == NULL)
713                 RETURN(-ENOMEM);
714         lov_init_set(set);
715
716         set->set_exp = exp;
717         set->set_oi = oinfo;
718         set->set_oi->oi_md = *lsmp;
719         set->set_oi->oi_oa = src_oa;
720         set->set_oti = oti;
721
722         rc = qos_prep_create(exp, set);
723         if (rc)
724                 lov_fini_create_set(set, lsmp);
725         else
726                 *reqset = set;
727         RETURN(rc);
728 }
729
730 static int common_attr_done(struct lov_request_set *set)
731 {
732         struct list_head *pos;
733         struct lov_request *req;
734         struct obdo *tmp_oa;
735         int rc = 0, attrset = 0;
736         ENTRY;
737
738         LASSERT(set->set_oi != NULL);
739
740         if (set->set_oi->oi_oa == NULL)
741                 RETURN(0);
742
743         if (!set->set_success)
744                 RETURN(-EIO);
745
746         tmp_oa = obdo_alloc();
747         if (tmp_oa == NULL)
748                 GOTO(out, rc = -ENOMEM);
749
750         list_for_each (pos, &set->set_list) {
751                 req = list_entry(pos, struct lov_request, rq_link);
752
753                 if (!req->rq_complete || req->rq_rc)
754                         continue;
755                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
756                         continue;
757                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
758                                 req->rq_oi.oi_oa->o_valid,
759                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
760         }
761         if (!attrset) {
762                 CERROR("No stripes had valid attrs\n");
763                 rc = -EIO;
764         }
765         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
766         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
767 out:
768         if (tmp_oa)
769                 obdo_free(tmp_oa);
770         RETURN(rc);
771
772 }
773
774 static int brw_done(struct lov_request_set *set)
775 {
776         struct lov_stripe_md *lsm = set->set_oi->oi_md;
777         struct lov_oinfo     *loi = NULL;
778         struct list_head *pos;
779         struct lov_request *req;
780         ENTRY;
781
782         list_for_each (pos, &set->set_list) {
783                 req = list_entry(pos, struct lov_request, rq_link);
784
785                 if (!req->rq_complete || req->rq_rc)
786                         continue;
787
788                 loi = &lsm->lsm_oinfo[req->rq_stripe];
789
790                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
791                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
792         }
793
794         RETURN(0);
795 }
796
797 int lov_fini_brw_set(struct lov_request_set *set)
798 {
799         int rc = 0;
800         ENTRY;
801
802         if (set == NULL)
803                 RETURN(0);
804         LASSERT(set->set_exp);
805         if (set->set_completes) {
806                 rc = brw_done(set);
807                 /* FIXME update qos data here */
808         }
809         if (atomic_dec_and_test(&set->set_refcount))
810                 lov_finish_set(set);
811
812         RETURN(rc);
813 }
814
815 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
816                      obd_count oa_bufs, struct brw_page *pga,
817                      struct obd_trans_info *oti,
818                      struct lov_request_set **reqset)
819 {
820         struct {
821                 obd_count       index;
822                 obd_count       count;
823                 obd_count       off;
824         } *info = NULL;
825         struct lov_request_set *set;
826         struct lov_oinfo *loi = NULL;
827         struct lov_obd *lov = &exp->exp_obd->u.lov;
828         int rc = 0, i, shift;
829         ENTRY;
830
831         OBD_ALLOC(set, sizeof(*set));
832         if (set == NULL)
833                 RETURN(-ENOMEM);
834         lov_init_set(set);
835
836         set->set_exp = exp;
837         set->set_oti = oti;
838         set->set_oi = oinfo;
839         set->set_oabufs = oa_bufs;
840         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
841         if (!set->set_pga)
842                 GOTO(out, rc = -ENOMEM);
843
844         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
845         if (!info)
846                 GOTO(out, rc = -ENOMEM);
847
848         /* calculate the page count for each stripe */
849         for (i = 0; i < oa_bufs; i++) {
850                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
851                 info[stripe].count++;
852         }
853
854         /* alloc and initialize lov request */
855         shift = 0;
856         for (i = 0, loi = oinfo->oi_md->lsm_oinfo;
857              i < oinfo->oi_md->lsm_stripe_count; i++, loi++){
858                 struct lov_request *req;
859
860                 if (info[i].count == 0)
861                         continue;
862
863                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
864                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
865                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
866                         GOTO(out, rc = -EIO);
867                 }
868
869                 OBD_ALLOC(req, sizeof(*req));
870                 if (req == NULL)
871                         GOTO(out, rc = -ENOMEM);
872
873                 req->rq_oi.oi_oa = obdo_alloc();
874                 if (req->rq_oi.oi_oa == NULL) {
875                         OBD_FREE(req, sizeof(*req));
876                         GOTO(out, rc = -ENOMEM);
877                 }
878
879                 if (oinfo->oi_oa) {
880                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
881                                sizeof(*req->rq_oi.oi_oa));
882                 }
883                 req->rq_oi.oi_oa->o_id = loi->loi_id;
884                 req->rq_oi.oi_oa->o_stripe_idx = i;
885
886                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
887                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
888                 if (req->rq_oi.oi_md == NULL) {
889                         obdo_free(req->rq_oi.oi_oa);
890                         OBD_FREE(req, sizeof(*req));
891                         GOTO(out, rc = -ENOMEM);
892                 }
893
894                 req->rq_idx = loi->loi_ost_idx;
895                 req->rq_stripe = i;
896
897                 /* XXX LOV STACKING */
898                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
899                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
900                 req->rq_oabufs = info[i].count;
901                 req->rq_pgaidx = shift;
902                 shift += req->rq_oabufs;
903
904                 /* remember the index for sort brw_page array */
905                 info[i].index = req->rq_pgaidx;
906
907                 lov_set_add_req(req, set);
908         }
909         if (!set->set_count)
910                 GOTO(out, rc = -EIO);
911
912         /* rotate & sort the brw_page array */
913         for (i = 0; i < oa_bufs; i++) {
914                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
915
916                 shift = info[stripe].index + info[stripe].off;
917                 LASSERT(shift < oa_bufs);
918                 set->set_pga[shift] = pga[i];
919                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
920                                   &set->set_pga[shift].off);
921                 info[stripe].off++;
922         }
923 out:
924         if (info)
925                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
926
927         if (rc == 0)
928                 *reqset = set;
929         else
930                 lov_fini_brw_set(set);
931
932         RETURN(rc);
933 }
934
935 int lov_fini_getattr_set(struct lov_request_set *set)
936 {
937         int rc = 0;
938         ENTRY;
939
940         if (set == NULL)
941                 RETURN(0);
942         LASSERT(set->set_exp);
943         if (set->set_completes)
944                 rc = common_attr_done(set);
945
946         if (atomic_dec_and_test(&set->set_refcount))
947                 lov_finish_set(set);
948
949         RETURN(rc);
950 }
951
952 /* The callback for osc_getattr_async that finilizes a request info when a
953  * response is recieved. */
954 static int cb_getattr_update(struct obd_info *oinfo, int rc)
955 {
956         struct lov_request *lovreq;
957         lovreq = container_of(oinfo, struct lov_request, rq_oi);
958         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
959 }
960
961 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
962                          struct lov_request_set **reqset)
963 {
964         struct lov_request_set *set;
965         struct lov_oinfo *loi = NULL;
966         struct lov_obd *lov = &exp->exp_obd->u.lov;
967         int rc = 0, i;
968         ENTRY;
969
970         OBD_ALLOC(set, sizeof(*set));
971         if (set == NULL)
972                 RETURN(-ENOMEM);
973         lov_init_set(set);
974
975         set->set_exp = exp;
976         set->set_oi = oinfo;
977
978         loi = oinfo->oi_md->lsm_oinfo;
979         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
980                 struct lov_request *req;
981
982                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
983                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
984                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
985                         continue;
986                 }
987
988                 OBD_ALLOC(req, sizeof(*req));
989                 if (req == NULL)
990                         GOTO(out_set, rc = -ENOMEM);
991
992                 req->rq_stripe = i;
993                 req->rq_idx = loi->loi_ost_idx;
994
995                 req->rq_oi.oi_oa = obdo_alloc();
996                 if (req->rq_oi.oi_oa == NULL) {
997                         OBD_FREE(req, sizeof(*req));
998                         GOTO(out_set, rc = -ENOMEM);
999                 }
1000                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1001                        sizeof(*req->rq_oi.oi_oa));
1002                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1003                 req->rq_oi.oi_cb_up = cb_getattr_update;
1004
1005                 lov_set_add_req(req, set);
1006         }
1007         if (!set->set_count)
1008                 GOTO(out_set, rc = -EIO);
1009         *reqset = set;
1010         RETURN(rc);
1011 out_set:
1012         lov_fini_getattr_set(set);
1013         RETURN(rc);
1014 }
1015
1016 int lov_fini_destroy_set(struct lov_request_set *set)
1017 {
1018         ENTRY;
1019
1020         if (set == NULL)
1021                 RETURN(0);
1022         LASSERT(set->set_exp);
1023         if (set->set_completes) {
1024                 /* FIXME update qos data here */
1025         }
1026
1027         if (atomic_dec_and_test(&set->set_refcount))
1028                 lov_finish_set(set);
1029
1030         RETURN(0);
1031 }
1032
1033 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1034                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1035                          struct obd_trans_info *oti,
1036                          struct lov_request_set **reqset)
1037 {
1038         struct lov_request_set *set;
1039         struct lov_oinfo *loi = NULL;
1040         struct lov_obd *lov = &exp->exp_obd->u.lov;
1041         int rc = 0, i;
1042         ENTRY;
1043
1044         OBD_ALLOC(set, sizeof(*set));
1045         if (set == NULL)
1046                 RETURN(-ENOMEM);
1047         lov_init_set(set);
1048
1049         set->set_exp = exp;
1050         set->set_oi = oinfo;
1051         set->set_oi->oi_md = lsm;
1052         set->set_oi->oi_oa = src_oa;
1053         set->set_oti = oti;
1054         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1055                 set->set_cookies = oti->oti_logcookies;
1056
1057         loi = lsm->lsm_oinfo;
1058         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1059                 struct lov_request *req;
1060
1061                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
1062                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1063                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1064                         continue;
1065                 }
1066
1067                 OBD_ALLOC(req, sizeof(*req));
1068                 if (req == NULL)
1069                         GOTO(out_set, rc = -ENOMEM);
1070
1071                 req->rq_stripe = i;
1072                 req->rq_idx = loi->loi_ost_idx;
1073
1074                 req->rq_oi.oi_oa = obdo_alloc();
1075                 if (req->rq_oi.oi_oa == NULL) {
1076                         OBD_FREE(req, sizeof(*req));
1077                         GOTO(out_set, rc = -ENOMEM);
1078                 }
1079                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1080                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1081                 lov_set_add_req(req, set);
1082         }
1083         if (!set->set_count)
1084                 GOTO(out_set, rc = -EIO);
1085         *reqset = set;
1086         RETURN(rc);
1087 out_set:
1088         lov_fini_destroy_set(set);
1089         RETURN(rc);
1090 }
1091
1092 int lov_fini_setattr_set(struct lov_request_set *set)
1093 {
1094         int rc = 0;
1095         ENTRY;
1096
1097         if (set == NULL)
1098                 RETURN(0);
1099         LASSERT(set->set_exp);
1100         if (set->set_completes) {
1101                 rc = common_attr_done(set);
1102                 /* FIXME update qos data here */
1103         }
1104
1105         if (atomic_dec_and_test(&set->set_refcount))
1106                 lov_finish_set(set);
1107         RETURN(rc);
1108 }
1109
1110 int lov_update_setattr_set(struct lov_request_set *set,
1111                            struct lov_request *req, int rc)
1112 {
1113         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1114         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1115         ENTRY;
1116
1117         lov_update_set(set, req, rc);
1118
1119         /* grace error on inactive ost */
1120         if (rc && !(lov->lov_tgts[req->rq_idx] && 
1121                     lov->lov_tgts[req->rq_idx]->ltd_active))
1122                 rc = 0;
1123
1124         if (rc == 0) {
1125                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1126                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_ctime =
1127                                 req->rq_oi.oi_oa->o_ctime;
1128                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1129                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_mtime =
1130                                 req->rq_oi.oi_oa->o_mtime;
1131                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1132                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_atime =
1133                                 req->rq_oi.oi_oa->o_atime;
1134         }
1135
1136         RETURN(rc);
1137 }
1138
1139 /* The callback for osc_setattr_async that finilizes a request info when a
1140  * response is recieved. */
1141 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1142 {
1143         struct lov_request *lovreq;
1144         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1145         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1146 }
1147
1148 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1149                          struct obd_trans_info *oti,
1150                          struct lov_request_set **reqset)
1151 {
1152         struct lov_request_set *set;
1153         struct lov_oinfo *loi = NULL;
1154         struct lov_obd *lov = &exp->exp_obd->u.lov;
1155         int rc = 0, i;
1156         ENTRY;
1157
1158         OBD_ALLOC(set, sizeof(*set));
1159         if (set == NULL)
1160                 RETURN(-ENOMEM);
1161         lov_init_set(set);
1162
1163         set->set_exp = exp;
1164         set->set_oti = oti;
1165         set->set_oi = oinfo;
1166         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1167                 set->set_cookies = oti->oti_logcookies;
1168
1169         loi = oinfo->oi_md->lsm_oinfo;
1170         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
1171                 struct lov_request *req;
1172
1173                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1174                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1175                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1176                         continue;
1177                 }
1178
1179                 OBD_ALLOC(req, sizeof(*req));
1180                 if (req == NULL)
1181                         GOTO(out_set, rc = -ENOMEM);
1182                 req->rq_stripe = i;
1183                 req->rq_idx = loi->loi_ost_idx;
1184
1185                 req->rq_oi.oi_oa = obdo_alloc();
1186                 if (req->rq_oi.oi_oa == NULL) {
1187                         OBD_FREE(req, sizeof(*req));
1188                         GOTO(out_set, rc = -ENOMEM);
1189                 }
1190                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1191                        sizeof(*req->rq_oi.oi_oa));
1192                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1193                 LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) 
1194                                 || req->rq_oi.oi_oa->o_gr>0);
1195                 req->rq_oi.oi_oa->o_stripe_idx = i;
1196                 req->rq_oi.oi_cb_up = cb_setattr_update;
1197                 req->rq_rqset = set;
1198
1199                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1200                         int off = lov_stripe_offset(oinfo->oi_md,
1201                                                     oinfo->oi_oa->o_size, i,
1202                                                     &req->rq_oi.oi_oa->o_size);
1203
1204                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1205                                 req->rq_oi.oi_oa->o_size--;
1206
1207                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1208                                i, req->rq_oi.oi_oa->o_size,
1209                                oinfo->oi_oa->o_size);
1210                 }
1211                 lov_set_add_req(req, set);
1212         }
1213         if (!set->set_count)
1214                 GOTO(out_set, rc = -EIO);
1215         *reqset = set;
1216         RETURN(rc);
1217 out_set:
1218         lov_fini_setattr_set(set);
1219         RETURN(rc);
1220 }
1221
1222 int lov_fini_punch_set(struct lov_request_set *set)
1223 {
1224         int rc = 0;
1225         ENTRY;
1226
1227         if (set == NULL)
1228                 RETURN(0);
1229         LASSERT(set->set_exp);
1230         if (set->set_completes) {
1231                 if (!set->set_success)
1232                         rc = -EIO;
1233                 /* FIXME update qos data here */
1234         }
1235
1236         if (atomic_dec_and_test(&set->set_refcount))
1237                 lov_finish_set(set);
1238
1239         RETURN(rc);
1240 }
1241
1242 /* The callback for osc_punch that finilizes a request info when a response
1243  * is recieved. */
1244 static int cb_update_punch(struct obd_info *oinfo, int rc)
1245 {
1246         struct lov_request *lovreq;
1247         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1248         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1249 }
1250
1251 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1252                        struct obd_trans_info *oti,
1253                        struct lov_request_set **reqset)
1254 {
1255         struct lov_request_set *set;
1256         struct lov_oinfo *loi = NULL;
1257         struct lov_obd *lov = &exp->exp_obd->u.lov;
1258         int rc = 0, i;
1259         ENTRY;
1260
1261         OBD_ALLOC(set, sizeof(*set));
1262         if (set == NULL)
1263                 RETURN(-ENOMEM);
1264         lov_init_set(set);
1265
1266         set->set_oi = oinfo;
1267         set->set_exp = exp;
1268
1269         loi = oinfo->oi_md->lsm_oinfo;
1270         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
1271                 struct lov_request *req;
1272                 obd_off rs, re;
1273
1274                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1275                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1276                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1277                         continue;
1278                 }
1279
1280                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1281                                            oinfo->oi_policy.l_extent.start,
1282                                            oinfo->oi_policy.l_extent.end,
1283                                            &rs, &re))
1284                         continue;
1285
1286                 OBD_ALLOC(req, sizeof(*req));
1287                 if (req == NULL)
1288                         GOTO(out_set, rc = -ENOMEM);
1289                 req->rq_stripe = i;
1290                 req->rq_idx = loi->loi_ost_idx;
1291
1292                 req->rq_oi.oi_oa = obdo_alloc();
1293                 if (req->rq_oi.oi_oa == NULL) {
1294                         OBD_FREE(req, sizeof(*req));
1295                         GOTO(out_set, rc = -ENOMEM);
1296                 }
1297                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1298                        sizeof(*req->rq_oi.oi_oa));
1299                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1300                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1301                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1302
1303                 req->rq_oi.oi_oa->o_stripe_idx = i;
1304                 req->rq_oi.oi_cb_up = cb_update_punch;
1305                 req->rq_rqset = set;
1306
1307                 req->rq_oi.oi_policy.l_extent.start = rs;
1308                 req->rq_oi.oi_policy.l_extent.end = re;
1309                 req->rq_oi.oi_policy.l_extent.gid = -1;
1310
1311                 lov_set_add_req(req, set);
1312         }
1313         if (!set->set_count)
1314                 GOTO(out_set, rc = -EIO);
1315         *reqset = set;
1316         RETURN(rc);
1317 out_set:
1318         lov_fini_punch_set(set);
1319         RETURN(rc);
1320 }
1321
1322 int lov_fini_sync_set(struct lov_request_set *set)
1323 {
1324         int rc = 0;
1325         ENTRY;
1326
1327         if (set == NULL)
1328                 RETURN(0);
1329         LASSERT(set->set_exp);
1330         if (set->set_completes) {
1331                 if (!set->set_success)
1332                         rc = -EIO;
1333                 /* FIXME update qos data here */
1334         }
1335
1336         if (atomic_dec_and_test(&set->set_refcount))
1337                 lov_finish_set(set);
1338
1339         RETURN(rc);
1340 }
1341
1342 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1343                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1344                       obd_off start, obd_off end,
1345                       struct lov_request_set **reqset)
1346 {
1347         struct lov_request_set *set;
1348         struct lov_oinfo *loi = NULL;
1349         struct lov_obd *lov = &exp->exp_obd->u.lov;
1350         int rc = 0, i;
1351         ENTRY;
1352
1353         OBD_ALLOC(set, sizeof(*set));
1354         if (set == NULL)
1355                 RETURN(-ENOMEM);
1356         lov_init_set(set);
1357
1358         set->set_exp = exp;
1359         set->set_oi = oinfo;
1360         set->set_oi->oi_md = lsm;
1361         set->set_oi->oi_oa = src_oa;
1362
1363         loi = lsm->lsm_oinfo;
1364         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1365                 struct lov_request *req;
1366                 obd_off rs, re;
1367
1368                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1369                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1370                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1371                         continue;
1372                 }
1373
1374                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1375                         continue;
1376
1377                 OBD_ALLOC(req, sizeof(*req));
1378                 if (req == NULL)
1379                         GOTO(out_set, rc = -ENOMEM);
1380                 req->rq_stripe = i;
1381                 req->rq_idx = loi->loi_ost_idx;
1382
1383                 req->rq_oi.oi_oa = obdo_alloc();
1384                 if (req->rq_oi.oi_oa == NULL) {
1385                         OBD_FREE(req, sizeof(*req));
1386                         GOTO(out_set, rc = -ENOMEM);
1387                 }
1388                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1389                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1390                 req->rq_oi.oi_oa->o_stripe_idx = i;
1391
1392                 req->rq_oi.oi_policy.l_extent.start = rs;
1393                 req->rq_oi.oi_policy.l_extent.end = re;
1394                 req->rq_oi.oi_policy.l_extent.gid = -1;
1395
1396                 lov_set_add_req(req, set);
1397         }
1398         if (!set->set_count)
1399                 GOTO(out_set, rc = -EIO);
1400         *reqset = set;
1401         RETURN(rc);
1402 out_set:
1403         lov_fini_sync_set(set);
1404         RETURN(rc);
1405 }
1406
1407 #define LOV_U64_MAX ((__u64)~0ULL)
1408 #define LOV_SUM_MAX(tot, add)                                           \
1409         do {                                                            \
1410                 if ((tot) + (add) < (tot))                              \
1411                         (tot) = LOV_U64_MAX;                            \
1412                 else                                                    \
1413                         (tot) += (add);                                 \
1414         } while(0)
1415
1416 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1417 {
1418         ENTRY;
1419
1420         if (success) {
1421                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1422
1423                 if (osfs->os_files != LOV_U64_MAX)
1424                         do_div(osfs->os_files, expected_stripes);
1425                 if (osfs->os_ffree != LOV_U64_MAX)
1426                         do_div(osfs->os_ffree, expected_stripes);
1427
1428                 spin_lock(&obd->obd_osfs_lock);
1429                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1430                 obd->obd_osfs_age = get_jiffies_64();
1431                 spin_unlock(&obd->obd_osfs_lock);
1432                 RETURN(0);
1433         }
1434
1435         RETURN(-EIO);
1436 }
1437
1438 int lov_fini_statfs_set(struct lov_request_set *set)
1439 {
1440         int rc = 0;
1441         ENTRY;
1442
1443         if (set == NULL)
1444                 RETURN(0);
1445
1446         if (set->set_completes) {
1447                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1448                                      set->set_success);
1449         }
1450
1451         if (atomic_dec_and_test(&set->set_refcount))
1452                 lov_finish_set(set);
1453
1454         RETURN(rc);
1455 }
1456
1457 void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1458                        struct obd_statfs *lov_sfs, int success)
1459 {
1460         spin_lock(&obd->obd_osfs_lock);
1461         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1462         obd->obd_osfs_age = get_jiffies_64();
1463         spin_unlock(&obd->obd_osfs_lock);
1464
1465         if (success == 0) {
1466                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1467         } else {
1468 #ifdef MIN_DF
1469                 /* Sandia requested that df (and so, statfs) only
1470                    returned minimal available space on
1471                    a single OST, so people would be able to
1472                    write this much data guaranteed. */
1473                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1474                         /* Presumably if new bavail is smaller,
1475                            new bfree is bigger as well */
1476                         osfs->os_bfree = lov_sfs->os_bfree;
1477                         osfs->os_bavail = lov_sfs->os_bavail;
1478                 }
1479 #else
1480                 osfs->os_bfree += lov_sfs->os_bfree;
1481                 osfs->os_bavail += lov_sfs->os_bavail;
1482 #endif
1483                 osfs->os_blocks += lov_sfs->os_blocks;
1484                 /* XXX not sure about this one - depends on policy.
1485                  *   - could be minimum if we always stripe on all OBDs
1486                  *     (but that would be wrong for any other policy,
1487                  *     if one of the OBDs has no more objects left)
1488                  *   - could be sum if we stripe whole objects
1489                  *   - could be average, just to give a nice number
1490                  *
1491                  * To give a "reasonable" (if not wholly accurate)
1492                  * number, we divide the total number of free objects
1493                  * by expected stripe count (watch out for overflow).
1494                  */
1495                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1496                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1497         }
1498 }
1499
1500 /* The callback for osc_statfs_async that finilizes a request info when a
1501  * response is recieved. */
1502 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1503 {
1504         struct lov_request *lovreq;
1505         struct obd_statfs *osfs, *lov_sfs;
1506         struct obd_device *obd;
1507         struct lov_obd *lov;
1508         int success;
1509         ENTRY;
1510
1511         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1512         lov = &lovreq->rq_rqset->set_obd->u.lov;
1513         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1514
1515         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1516         lov_sfs = oinfo->oi_osfs;
1517
1518         success = lovreq->rq_rqset->set_success;
1519
1520         /* XXX: the same is done in lov_update_common_set, however
1521            lovset->set_exp is not initialized. */
1522         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1523         if (rc) {
1524                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1525                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1526                         rc = 0;
1527                 RETURN(rc);
1528         }
1529
1530         lov_update_statfs(obd, osfs, lov_sfs, success);
1531         RETURN(0);
1532 }
1533
1534 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1535                         struct lov_request_set **reqset)
1536 {
1537         struct lov_request_set *set;
1538         struct lov_obd *lov = &obd->u.lov;
1539         int rc = 0, i;
1540         ENTRY;
1541
1542         OBD_ALLOC(set, sizeof(*set));
1543         if (set == NULL)
1544                 RETURN(-ENOMEM);
1545         lov_init_set(set);
1546
1547         set->set_obd = obd;
1548         set->set_oi = oinfo;
1549
1550         /* We only get block data from the OBD */
1551         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1552                 struct lov_request *req;
1553
1554                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1555                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1556                         continue;
1557                 }
1558
1559                 OBD_ALLOC(req, sizeof(*req));
1560                 if (req == NULL)
1561                         GOTO(out_set, rc = -ENOMEM);
1562
1563                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1564                 if (req->rq_oi.oi_osfs == NULL) {
1565                         OBD_FREE(req, sizeof(*req));
1566                         GOTO(out_set, rc = -ENOMEM);
1567                 }
1568
1569                 req->rq_idx = i;
1570                 req->rq_oi.oi_cb_up = cb_statfs_update;
1571                 req->rq_rqset = set;
1572
1573                 lov_set_add_req(req, set);
1574         }
1575         if (!set->set_count)
1576                 GOTO(out_set, rc = -EIO);
1577         *reqset = set;
1578         RETURN(rc);
1579 out_set:
1580         lov_fini_statfs_set(set);
1581         RETURN(rc);
1582 }