Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29
30 #ifdef __KERNEL__
31 #include <libcfs/libcfs.h>
32 #else
33 #include <liblustre.h>
34 #endif
35
36 #include <obd_class.h>
37 #include <obd_lov.h>
38 #include <lustre/lustre_idl.h>
39
40 #include "lov_internal.h"
41
42 static void lov_init_set(struct lov_request_set *set)
43 {
44         set->set_count = 0;
45         set->set_completes = 0;
46         set->set_success = 0;
47         set->set_cookies = 0;
48         CFS_INIT_LIST_HEAD(&set->set_list);
49         atomic_set(&set->set_refcount, 1);
50 }
51
52 static void lov_finish_set(struct lov_request_set *set)
53 {
54         struct list_head *pos, *n;
55         ENTRY;
56
57         LASSERT(set);
58         list_for_each_safe(pos, n, &set->set_list) {
59                 struct lov_request *req = list_entry(pos, struct lov_request,
60                                                      rq_link);
61                 list_del_init(&req->rq_link);
62
63                 if (req->rq_oi.oi_oa)
64                         OBDO_FREE(req->rq_oi.oi_oa);
65                 if (req->rq_oi.oi_md)
66                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
67                 if (req->rq_oi.oi_osfs)
68                         OBD_FREE(req->rq_oi.oi_osfs,
69                                  sizeof(*req->rq_oi.oi_osfs));
70                 OBD_FREE(req, sizeof(*req));
71         }
72
73         if (set->set_pga) {
74                 int len = set->set_oabufs * sizeof(*set->set_pga);
75                 OBD_FREE(set->set_pga, len);
76         }
77         if (set->set_lockh)
78                 lov_llh_put(set->set_lockh);
79
80         OBD_FREE(set, sizeof(*set));
81         EXIT;
82 }
83
84 void lov_update_set(struct lov_request_set *set,
85                     struct lov_request *req, int rc)
86 {
87         req->rq_complete = 1;
88         req->rq_rc = rc;
89
90         set->set_completes++;
91         if (rc == 0)
92                 set->set_success++;
93 }
94
95 int lov_update_common_set(struct lov_request_set *set,
96                           struct lov_request *req, int rc)
97 {
98         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
99         ENTRY;
100
101         lov_update_set(set, req, rc);
102
103         /* grace error on inactive ost */
104         if (rc && !(lov->lov_tgts[req->rq_idx] && 
105                     lov->lov_tgts[req->rq_idx]->ltd_active))
106                 rc = 0;
107
108         /* FIXME in raid1 regime, should return 0 */
109         RETURN(rc);
110 }
111
112 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
113 {
114         list_add_tail(&req->rq_link, &set->set_list);
115         set->set_count++;
116 }
117
118 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
119 {
120         struct lov_request_set *set = req->rq_rqset;
121         struct lustre_handle *lov_lockhp;
122         struct lov_oinfo *loi;
123         ENTRY;
124
125         LASSERT(set != NULL);
126         LASSERT(set->set_oi != NULL);
127
128         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
129         loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
130
131         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
132          * and that copy can be arbitrarily out of date.
133          *
134          * The LOV API is due for a serious rewriting anyways, and this
135          * can be addressed then. */
136
137         if (rc == ELDLM_OK) {
138                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
139                 __u64 tmp;
140
141                 LASSERT(lock != NULL);
142                 lov_stripe_lock(set->set_oi->oi_md);
143                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
144                 tmp = loi->loi_lvb.lvb_size;
145                 /* Extend KMS up to the end of this lock and no further
146                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
147                 if (tmp > lock->l_policy_data.l_extent.end)
148                         tmp = lock->l_policy_data.l_extent.end + 1;
149                 if (tmp >= loi->loi_kms) {
150                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
151                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
152                         loi->loi_kms = tmp;
153                         loi->loi_kms_valid = 1;
154                 } else {
155                         LDLM_DEBUG(lock, "lock acquired, setting rss="
156                                    LPU64"; leaving kms="LPU64", end="LPU64,
157                                    loi->loi_lvb.lvb_size, loi->loi_kms,
158                                    lock->l_policy_data.l_extent.end);
159                 }
160                 lov_stripe_unlock(set->set_oi->oi_md);
161                 ldlm_lock_allow_match(lock);
162                 LDLM_LOCK_PUT(lock);
163         } else if ((rc == ELDLM_LOCK_ABORTED) &&
164                    (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
165                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
166                 lov_stripe_lock(set->set_oi->oi_md);
167                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
168                 lov_stripe_unlock(set->set_oi->oi_md);
169                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
170                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
171                 rc = ELDLM_OK;
172         } else {
173                 struct obd_export *exp = set->set_exp;
174                 struct lov_obd *lov = &exp->exp_obd->u.lov;
175
176                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
177                 if (lov->lov_tgts[req->rq_idx] && 
178                     lov->lov_tgts[req->rq_idx]->ltd_active) {
179                         /* -EUSERS used by OST to report file contention */
180                         if (rc != -EINTR && rc != -EUSERS)
181                                 CERROR("enqueue objid "LPX64" subobj "
182                                        LPX64" on OST idx %d: rc %d\n",
183                                        set->set_oi->oi_md->lsm_object_id,
184                                        loi->loi_id, loi->loi_ost_idx, rc);
185                 } else {
186                         rc = ELDLM_OK;
187                 }
188         }
189         lov_update_set(set, req, rc);
190         RETURN(rc);
191 }
192
193 /* The callback for osc_enqueue that updates lov info for every OSC request. */
194 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
195 {
196         struct ldlm_enqueue_info *einfo;
197         struct lov_request *lovreq;
198
199         lovreq = container_of(oinfo, struct lov_request, rq_oi);
200         einfo = lovreq->rq_rqset->set_ei;
201         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
202 }
203
204 static int enqueue_done(struct lov_request_set *set, __u32 mode)
205 {
206         struct lov_request *req;
207         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
208         int rc = 0;
209         ENTRY;
210
211         /* enqueue/match success, just return */
212         if (set->set_completes && set->set_completes == set->set_success)
213                 RETURN(0);
214
215         /* cancel enqueued/matched locks */
216         list_for_each_entry(req, &set->set_list, rq_link) {
217                 struct lustre_handle *lov_lockhp;
218
219                 if (!req->rq_complete || req->rq_rc)
220                         continue;
221
222                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
223                 LASSERT(lov_lockhp);
224                 if (!lustre_handle_is_used(lov_lockhp))
225                         continue;
226
227                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
228                                 req->rq_oi.oi_md, mode, lov_lockhp);
229                 if (rc && lov->lov_tgts[req->rq_idx] &&
230                     lov->lov_tgts[req->rq_idx]->ltd_active)
231                         CERROR("cancelling obdjid "LPX64" on OST "
232                                "idx %d error: rc = %d\n",
233                                req->rq_oi.oi_md->lsm_object_id,
234                                req->rq_idx, rc);
235         }
236         if (set->set_lockh)
237                 lov_llh_put(set->set_lockh);
238         RETURN(rc);
239 }
240
241 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
242                          struct ptlrpc_request_set *rqset)
243 {
244         int ret = 0;
245         ENTRY;
246
247         if (set == NULL)
248                 RETURN(0);
249         LASSERT(set->set_exp);
250         /* Do enqueue_done only for sync requests and if any request
251          * succeeded. */
252         if (!rqset) {
253                 if (rc)
254                         set->set_completes = 0;
255                 ret = enqueue_done(set, mode);
256         } else if (set->set_lockh)
257                 lov_llh_put(set->set_lockh);
258
259         if (atomic_dec_and_test(&set->set_refcount))
260                 lov_finish_set(set);
261
262         RETURN(rc ? rc : ret);
263 }
264
265 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
266                          struct ldlm_enqueue_info *einfo,
267                          struct lov_request_set **reqset)
268 {
269         struct lov_obd *lov = &exp->exp_obd->u.lov;
270         struct lov_request_set *set;
271         int i, rc = 0;
272         ENTRY;
273
274         OBD_ALLOC(set, sizeof(*set));
275         if (set == NULL)
276                 RETURN(-ENOMEM);
277         lov_init_set(set);
278
279         set->set_exp = exp;
280         set->set_oi = oinfo;
281         set->set_ei = einfo;
282         set->set_lockh = lov_llh_new(oinfo->oi_md);
283         if (set->set_lockh == NULL)
284                 GOTO(out_set, rc = -ENOMEM);
285         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
286
287         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
288                 struct lov_oinfo *loi;
289                 struct lov_request *req;
290                 obd_off start, end;
291
292                 loi = oinfo->oi_md->lsm_oinfo[i];
293                 if (!lov_stripe_intersects(oinfo->oi_md, i,
294                                            oinfo->oi_policy.l_extent.start,
295                                            oinfo->oi_policy.l_extent.end,
296                                            &start, &end))
297                         continue;
298
299                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
300                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
301                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
302                         continue;
303                 }
304
305                 OBD_ALLOC(req, sizeof(*req));
306                 if (req == NULL)
307                         GOTO(out_set, rc = -ENOMEM);
308
309                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
310                         sizeof(struct lov_oinfo *) +
311                         sizeof(struct lov_oinfo);
312                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
313                 if (req->rq_oi.oi_md == NULL) {
314                         OBD_FREE(req, sizeof(*req));
315                         GOTO(out_set, rc = -ENOMEM);
316                 }
317                 req->rq_oi.oi_md->lsm_oinfo[0] =
318                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
319                         sizeof(struct lov_oinfo *);
320
321
322                 req->rq_rqset = set;
323                 /* Set lov request specific parameters. */
324                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
325                 req->rq_oi.oi_cb_up = cb_update_enqueue;
326                 req->rq_oi.oi_flags = oinfo->oi_flags;
327
328                 LASSERT(req->rq_oi.oi_lockh);
329
330                 req->rq_oi.oi_policy.l_extent.gid =
331                         oinfo->oi_policy.l_extent.gid;
332                 req->rq_oi.oi_policy.l_extent.start = start;
333                 req->rq_oi.oi_policy.l_extent.end = end;
334
335                 req->rq_idx = loi->loi_ost_idx;
336                 req->rq_stripe = i;
337
338                 /* XXX LOV STACKING: submd should be from the subobj */
339                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
340                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
341                 req->rq_oi.oi_md->lsm_stripe_count = 0;
342                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
343                         loi->loi_kms_valid;
344                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
345                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
346
347                 lov_set_add_req(req, set);
348         }
349         if (!set->set_count)
350                 GOTO(out_set, rc = -EIO);
351         *reqset = set;
352         RETURN(0);
353 out_set:
354         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
355         RETURN(rc);
356 }
357
358 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
359                          int rc)
360 {
361         int ret = rc;
362         ENTRY;
363
364         if (rc > 0)
365                 ret = 0;
366         else if (rc == 0)
367                 ret = 1;
368         lov_update_set(set, req, ret);
369         RETURN(rc);
370 }
371
372 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
373 {
374         int rc = 0;
375         ENTRY;
376
377         if (set == NULL)
378                 RETURN(0);
379         LASSERT(set->set_exp);
380         rc = enqueue_done(set, mode);
381         if ((set->set_count == set->set_success) &&
382             (flags & LDLM_FL_TEST_LOCK))
383                 lov_llh_put(set->set_lockh);
384
385         if (atomic_dec_and_test(&set->set_refcount))
386                 lov_finish_set(set);
387
388         RETURN(rc);
389 }
390
391 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
392                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
393                        __u32 mode, struct lustre_handle *lockh,
394                        struct lov_request_set **reqset)
395 {
396         struct lov_obd *lov = &exp->exp_obd->u.lov;
397         struct lov_request_set *set;
398         int i, rc = 0;
399         ENTRY;
400
401         OBD_ALLOC(set, sizeof(*set));
402         if (set == NULL)
403                 RETURN(-ENOMEM);
404         lov_init_set(set);
405
406         set->set_exp = exp;
407         set->set_oi = oinfo;
408         set->set_oi->oi_md = lsm;
409         set->set_lockh = lov_llh_new(lsm);
410         if (set->set_lockh == NULL)
411                 GOTO(out_set, rc = -ENOMEM);
412         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
413
414         for (i = 0; i < lsm->lsm_stripe_count; i++){
415                 struct lov_oinfo *loi;
416                 struct lov_request *req;
417                 obd_off start, end;
418
419                 loi = lsm->lsm_oinfo[i];
420                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
421                                            policy->l_extent.end, &start, &end))
422                         continue;
423
424                 /* FIXME raid1 should grace this error */
425                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
426                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
427                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
428                         GOTO(out_set, rc = -EIO);
429                 }
430
431                 OBD_ALLOC(req, sizeof(*req));
432                 if (req == NULL)
433                         GOTO(out_set, rc = -ENOMEM);
434
435                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
436                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
437                 if (req->rq_oi.oi_md == NULL) {
438                         OBD_FREE(req, sizeof(*req));
439                         GOTO(out_set, rc = -ENOMEM);
440                 }
441
442                 req->rq_oi.oi_policy.l_extent.start = start;
443                 req->rq_oi.oi_policy.l_extent.end = end;
444                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
445
446                 req->rq_idx = loi->loi_ost_idx;
447                 req->rq_stripe = i;
448
449                 /* XXX LOV STACKING: submd should be from the subobj */
450                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
451                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
452                 req->rq_oi.oi_md->lsm_stripe_count = 0;
453
454                 lov_set_add_req(req, set);
455         }
456         if (!set->set_count)
457                 GOTO(out_set, rc = -EIO);
458         *reqset = set;
459         RETURN(rc);
460 out_set:
461         lov_fini_match_set(set, mode, 0);
462         RETURN(rc);
463 }
464
465 int lov_fini_cancel_set(struct lov_request_set *set)
466 {
467         int rc = 0;
468         ENTRY;
469
470         if (set == NULL)
471                 RETURN(0);
472
473         LASSERT(set->set_exp);
474         if (set->set_lockh)
475                 lov_llh_put(set->set_lockh);
476
477         if (atomic_dec_and_test(&set->set_refcount))
478                 lov_finish_set(set);
479
480         RETURN(rc);
481 }
482
483 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
484                         struct lov_stripe_md *lsm, __u32 mode,
485                         struct lustre_handle *lockh,
486                         struct lov_request_set **reqset)
487 {
488         struct lov_request_set *set;
489         int i, rc = 0;
490         ENTRY;
491
492         OBD_ALLOC(set, sizeof(*set));
493         if (set == NULL)
494                 RETURN(-ENOMEM);
495         lov_init_set(set);
496
497         set->set_exp = exp;
498         set->set_oi = oinfo;
499         set->set_oi->oi_md = lsm;
500         set->set_lockh = lov_handle2llh(lockh);
501         if (set->set_lockh == NULL) {
502                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
503                 GOTO(out_set, rc = -EINVAL);
504         }
505         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
506
507         for (i = 0; i < lsm->lsm_stripe_count; i++){
508                 struct lov_request *req;
509                 struct lustre_handle *lov_lockhp;
510                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
511
512                 lov_lockhp = set->set_lockh->llh_handles + i;
513                 if (!lustre_handle_is_used(lov_lockhp)) {
514                         CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
515                                loi->loi_ost_idx, loi->loi_id);
516                         continue;
517                 }
518
519                 OBD_ALLOC(req, sizeof(*req));
520                 if (req == NULL)
521                         GOTO(out_set, rc = -ENOMEM);
522
523                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
524                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
525                 if (req->rq_oi.oi_md == NULL) {
526                         OBD_FREE(req, sizeof(*req));
527                         GOTO(out_set, rc = -ENOMEM);
528                 }
529
530                 req->rq_idx = loi->loi_ost_idx;
531                 req->rq_stripe = i;
532
533                 /* XXX LOV STACKING: submd should be from the subobj */
534                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
535                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
536                 req->rq_oi.oi_md->lsm_stripe_count = 0;
537
538                 lov_set_add_req(req, set);
539         }
540         if (!set->set_count)
541                 GOTO(out_set, rc = -EIO);
542         *reqset = set;
543         RETURN(rc);
544 out_set:
545         lov_fini_cancel_set(set);
546         RETURN(rc);
547 }
548
549 static int create_done(struct obd_export *exp, struct lov_request_set *set,
550                        struct lov_stripe_md **lsmp)
551 {
552         struct lov_obd *lov = &exp->exp_obd->u.lov;
553         struct obd_trans_info *oti = set->set_oti;
554         struct obdo *src_oa = set->set_oi->oi_oa;
555         struct lov_request *req;
556         struct obdo *ret_oa = NULL;
557         int attrset = 0, rc = 0;
558         ENTRY;
559
560         LASSERT(set->set_completes);
561
562         /* try alloc objects on other osts if osc_create fails for
563          * exceptions: RPC failure, ENOSPC, etc */
564         if (set->set_count != set->set_success) {
565                 list_for_each_entry (req, &set->set_list, rq_link) {
566                         if (req->rq_rc == 0)
567                                 continue;
568
569                         set->set_completes--;
570                         req->rq_complete = 0;
571
572                         rc = qos_remedy_create(set, req);
573                         lov_update_create_set(set, req, rc);
574
575                         if (rc)
576                                 break;
577                 }
578         }
579
580         /* no successful creates */
581         if (set->set_success == 0)
582                 GOTO(cleanup, rc);
583
584         /* If there was an explicit stripe set, fail.  Otherwise, we
585          * got some objects and that's not bad. */
586         if (set->set_count != set->set_success) {
587                 if (*lsmp)
588                         GOTO(cleanup, rc);
589                 set->set_count = set->set_success;
590                 qos_shrink_lsm(set);
591         }
592
593         OBDO_ALLOC(ret_oa);
594         if (ret_oa == NULL)
595                 GOTO(cleanup, rc = -ENOMEM);
596
597         list_for_each_entry(req, &set->set_list, rq_link) {
598                 if (!req->rq_complete || req->rq_rc)
599                         continue;
600                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
601                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
602                                 req->rq_stripe, &attrset);
603         }
604         if (src_oa->o_valid & OBD_MD_FLSIZE &&
605             ret_oa->o_size != src_oa->o_size) {
606                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
607                        src_oa->o_size, ret_oa->o_size);
608                 LBUG();
609         }
610         ret_oa->o_id = src_oa->o_id;
611         ret_oa->o_gr = src_oa->o_gr;
612         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
613         memcpy(src_oa, ret_oa, sizeof(*src_oa));
614         OBDO_FREE(ret_oa);
615
616         *lsmp = set->set_oi->oi_md;
617         GOTO(done, rc = 0);
618
619 cleanup:
620         list_for_each_entry(req, &set->set_list, rq_link) {
621                 struct obd_export *sub_exp;
622                 int err = 0;
623
624                 if (!req->rq_complete || req->rq_rc)
625                         continue;
626
627                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
628                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
629                 if (err)
630                         CERROR("Failed to uncreate objid "LPX64" subobj "
631                                LPX64" on OST idx %d: rc = %d\n",
632                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
633                                req->rq_idx, rc);
634         }
635         if (*lsmp == NULL)
636                 obd_free_memmd(exp, &set->set_oi->oi_md);
637 done:
638         if (oti && set->set_cookies) {
639                 oti->oti_logcookies = set->set_cookies;
640                 if (!set->set_cookie_sent) {
641                         oti_free_cookies(oti);
642                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
643                 } else {
644                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
645                 }
646         }
647         RETURN(rc);
648 }
649
650 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
651 {
652         int rc = 0;
653         ENTRY;
654
655         if (set == NULL)
656                 RETURN(0);
657         LASSERT(set->set_exp);
658         if (set->set_completes)
659                 rc = create_done(set->set_exp, set, lsmp);
660
661         if (atomic_dec_and_test(&set->set_refcount))
662                 lov_finish_set(set);
663
664         RETURN(rc);
665 }
666
667 int lov_update_create_set(struct lov_request_set *set,
668                           struct lov_request *req, int rc)
669 {
670         struct obd_trans_info *oti = set->set_oti;
671         struct lov_stripe_md *lsm = set->set_oi->oi_md;
672         struct lov_oinfo *loi;
673         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
674         ENTRY;
675
676         req->rq_stripe = set->set_success;
677         loi = lsm->lsm_oinfo[req->rq_stripe];
678
679         if (rc && lov->lov_tgts[req->rq_idx] &&
680             lov->lov_tgts[req->rq_idx]->ltd_active) {
681                 CERROR("error creating fid "LPX64" sub-object"
682                        " on OST idx %d/%d: rc = %d\n",
683                        set->set_oi->oi_oa->o_id, req->rq_idx,
684                        lsm->lsm_stripe_count, rc);
685                 if (rc > 0) {
686                         CERROR("obd_create returned invalid err %d\n", rc);
687                         rc = -EIO;
688                 }
689         }
690         lov_update_set(set, req, rc);
691         if (rc)
692                 RETURN(rc);
693
694         loi->loi_id = req->rq_oi.oi_oa->o_id;
695         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
696         loi->loi_ost_idx = req->rq_idx;
697         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
698                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
699         loi_init(loi);
700
701         if (oti && set->set_cookies)
702                 ++oti->oti_logcookies;
703         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
704                 set->set_cookie_sent++;
705
706         RETURN(0);
707 }
708
709 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
710                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
711                         struct obd_trans_info *oti,
712                         struct lov_request_set **reqset)
713 {
714         struct lov_request_set *set;
715         int rc = 0;
716         ENTRY;
717
718         OBD_ALLOC(set, sizeof(*set));
719         if (set == NULL)
720                 RETURN(-ENOMEM);
721         lov_init_set(set);
722
723         set->set_exp = exp;
724         set->set_oi = oinfo;
725         set->set_oi->oi_md = *lsmp;
726         set->set_oi->oi_oa = src_oa;
727         set->set_oti = oti;
728
729         rc = qos_prep_create(exp, set);
730         if (rc)
731                 lov_fini_create_set(set, lsmp);
732         else
733                 *reqset = set;
734         RETURN(rc);
735 }
736
737 static int common_attr_done(struct lov_request_set *set)
738 {
739         struct list_head *pos;
740         struct lov_request *req;
741         struct obdo *tmp_oa;
742         int rc = 0, attrset = 0;
743         ENTRY;
744
745         LASSERT(set->set_oi != NULL);
746
747         if (set->set_oi->oi_oa == NULL)
748                 RETURN(0);
749
750         if (!set->set_success)
751                 RETURN(-EIO);
752
753         OBDO_ALLOC(tmp_oa);
754         if (tmp_oa == NULL)
755                 GOTO(out, rc = -ENOMEM);
756
757         list_for_each (pos, &set->set_list) {
758                 req = list_entry(pos, struct lov_request, rq_link);
759
760                 if (!req->rq_complete || req->rq_rc)
761                         continue;
762                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
763                         continue;
764                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
765                                 req->rq_oi.oi_oa->o_valid,
766                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
767         }
768         if (!attrset) {
769                 CERROR("No stripes had valid attrs\n");
770                 rc = -EIO;
771         }
772         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
773         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
774 out:
775         if (tmp_oa)
776                 OBDO_FREE(tmp_oa);
777         RETURN(rc);
778
779 }
780
781 static int brw_done(struct lov_request_set *set)
782 {
783         struct lov_stripe_md *lsm = set->set_oi->oi_md;
784         struct lov_oinfo     *loi = NULL;
785         struct list_head *pos;
786         struct lov_request *req;
787         ENTRY;
788
789         list_for_each (pos, &set->set_list) {
790                 req = list_entry(pos, struct lov_request, rq_link);
791
792                 if (!req->rq_complete || req->rq_rc)
793                         continue;
794
795                 loi = lsm->lsm_oinfo[req->rq_stripe];
796
797                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
798                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
799         }
800
801         RETURN(0);
802 }
803
804 int lov_fini_brw_set(struct lov_request_set *set)
805 {
806         int rc = 0;
807         ENTRY;
808
809         if (set == NULL)
810                 RETURN(0);
811         LASSERT(set->set_exp);
812         if (set->set_completes) {
813                 rc = brw_done(set);
814                 /* FIXME update qos data here */
815         }
816         if (atomic_dec_and_test(&set->set_refcount))
817                 lov_finish_set(set);
818
819         RETURN(rc);
820 }
821
822 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
823                      obd_count oa_bufs, struct brw_page *pga,
824                      struct obd_trans_info *oti,
825                      struct lov_request_set **reqset)
826 {
827         struct {
828                 obd_count       index;
829                 obd_count       count;
830                 obd_count       off;
831         } *info = NULL;
832         struct lov_request_set *set;
833         struct lov_obd *lov = &exp->exp_obd->u.lov;
834         int rc = 0, i, shift;
835         ENTRY;
836
837         OBD_ALLOC(set, sizeof(*set));
838         if (set == NULL)
839                 RETURN(-ENOMEM);
840         lov_init_set(set);
841
842         set->set_exp = exp;
843         set->set_oti = oti;
844         set->set_oi = oinfo;
845         set->set_oabufs = oa_bufs;
846         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
847         if (!set->set_pga)
848                 GOTO(out, rc = -ENOMEM);
849
850         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
851         if (!info)
852                 GOTO(out, rc = -ENOMEM);
853
854         /* calculate the page count for each stripe */
855         for (i = 0; i < oa_bufs; i++) {
856                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
857                 info[stripe].count++;
858         }
859
860         /* alloc and initialize lov request */
861         shift = 0;
862         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
863                 struct lov_oinfo *loi = NULL;
864                 struct lov_request *req;
865
866                 if (info[i].count == 0)
867                         continue;
868                 
869                 loi = oinfo->oi_md->lsm_oinfo[i];
870                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
871                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
872                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
873                         GOTO(out, rc = -EIO);
874                 }
875
876                 OBD_ALLOC(req, sizeof(*req));
877                 if (req == NULL)
878                         GOTO(out, rc = -ENOMEM);
879
880                 OBDO_ALLOC(req->rq_oi.oi_oa);
881                 if (req->rq_oi.oi_oa == NULL) {
882                         OBD_FREE(req, sizeof(*req));
883                         GOTO(out, rc = -ENOMEM);
884                 }
885
886                 if (oinfo->oi_oa) {
887                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
888                                sizeof(*req->rq_oi.oi_oa));
889                 }
890                 req->rq_oi.oi_oa->o_id = loi->loi_id;
891                 req->rq_oi.oi_oa->o_stripe_idx = i;
892
893                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
894                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
895                 if (req->rq_oi.oi_md == NULL) {
896                         OBDO_FREE(req->rq_oi.oi_oa);
897                         OBD_FREE(req, sizeof(*req));
898                         GOTO(out, rc = -ENOMEM);
899                 }
900
901                 req->rq_idx = loi->loi_ost_idx;
902                 req->rq_stripe = i;
903
904                 /* XXX LOV STACKING */
905                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
906                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
907                 req->rq_oabufs = info[i].count;
908                 req->rq_pgaidx = shift;
909                 shift += req->rq_oabufs;
910
911                 /* remember the index for sort brw_page array */
912                 info[i].index = req->rq_pgaidx;
913
914                 req->rq_oi.oi_capa = oinfo->oi_capa;
915
916                 lov_set_add_req(req, set);
917         }
918         if (!set->set_count)
919                 GOTO(out, rc = -EIO);
920
921         /* rotate & sort the brw_page array */
922         for (i = 0; i < oa_bufs; i++) {
923                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
924
925                 shift = info[stripe].index + info[stripe].off;
926                 LASSERT(shift < oa_bufs);
927                 set->set_pga[shift] = pga[i];
928                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
929                                   &set->set_pga[shift].off);
930                 info[stripe].off++;
931         }
932 out:
933         if (info)
934                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
935
936         if (rc == 0)
937                 *reqset = set;
938         else
939                 lov_fini_brw_set(set);
940
941         RETURN(rc);
942 }
943
944 int lov_fini_getattr_set(struct lov_request_set *set)
945 {
946         int rc = 0;
947         ENTRY;
948
949         if (set == NULL)
950                 RETURN(0);
951         LASSERT(set->set_exp);
952         if (set->set_completes)
953                 rc = common_attr_done(set);
954
955         if (atomic_dec_and_test(&set->set_refcount))
956                 lov_finish_set(set);
957
958         RETURN(rc);
959 }
960
961 /* The callback for osc_getattr_async that finilizes a request info when a
962  * response is recieved. */
963 static int cb_getattr_update(struct obd_info *oinfo, int rc)
964 {
965         struct lov_request *lovreq;
966         lovreq = container_of(oinfo, struct lov_request, rq_oi);
967         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
968 }
969
970 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
971                          struct lov_request_set **reqset)
972 {
973         struct lov_request_set *set;
974         struct lov_obd *lov = &exp->exp_obd->u.lov;
975         int rc = 0, i;
976         ENTRY;
977
978         OBD_ALLOC(set, sizeof(*set));
979         if (set == NULL)
980                 RETURN(-ENOMEM);
981         lov_init_set(set);
982
983         set->set_exp = exp;
984         set->set_oi = oinfo;
985
986         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
987                 struct lov_oinfo *loi;
988                 struct lov_request *req;
989
990                 loi = oinfo->oi_md->lsm_oinfo[i];
991                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
992                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
993                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
994                         continue;
995                 }
996
997                 OBD_ALLOC(req, sizeof(*req));
998                 if (req == NULL)
999                         GOTO(out_set, rc = -ENOMEM);
1000
1001                 req->rq_stripe = i;
1002                 req->rq_idx = loi->loi_ost_idx;
1003
1004                 OBDO_ALLOC(req->rq_oi.oi_oa);
1005                 if (req->rq_oi.oi_oa == NULL) {
1006                         OBD_FREE(req, sizeof(*req));
1007                         GOTO(out_set, rc = -ENOMEM);
1008                 }
1009                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1010                        sizeof(*req->rq_oi.oi_oa));
1011                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1012                 req->rq_oi.oi_cb_up = cb_getattr_update;
1013                 req->rq_oi.oi_capa = oinfo->oi_capa;
1014                 req->rq_rqset = set;
1015
1016                 lov_set_add_req(req, set);
1017         }
1018         if (!set->set_count)
1019                 GOTO(out_set, rc = -EIO);
1020         *reqset = set;
1021         RETURN(rc);
1022 out_set:
1023         lov_fini_getattr_set(set);
1024         RETURN(rc);
1025 }
1026
1027 int lov_fini_destroy_set(struct lov_request_set *set)
1028 {
1029         ENTRY;
1030
1031         if (set == NULL)
1032                 RETURN(0);
1033         LASSERT(set->set_exp);
1034         if (set->set_completes) {
1035                 /* FIXME update qos data here */
1036         }
1037
1038         if (atomic_dec_and_test(&set->set_refcount))
1039                 lov_finish_set(set);
1040
1041         RETURN(0);
1042 }
1043
1044 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1045                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1046                          struct obd_trans_info *oti,
1047                          struct lov_request_set **reqset)
1048 {
1049         struct lov_request_set *set;
1050         struct lov_obd *lov = &exp->exp_obd->u.lov;
1051         int rc = 0, i;
1052         ENTRY;
1053
1054         OBD_ALLOC(set, sizeof(*set));
1055         if (set == NULL)
1056                 RETURN(-ENOMEM);
1057         lov_init_set(set);
1058
1059         set->set_exp = exp;
1060         set->set_oi = oinfo;
1061         set->set_oi->oi_md = lsm;
1062         set->set_oi->oi_oa = src_oa;
1063         set->set_oti = oti;
1064         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1065                 set->set_cookies = oti->oti_logcookies;
1066
1067         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1068                 struct lov_oinfo *loi;
1069                 struct lov_request *req;
1070
1071                 loi = lsm->lsm_oinfo[i];
1072                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
1073                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1074                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1075                         continue;
1076                 }
1077
1078                 OBD_ALLOC(req, sizeof(*req));
1079                 if (req == NULL)
1080                         GOTO(out_set, rc = -ENOMEM);
1081
1082                 req->rq_stripe = i;
1083                 req->rq_idx = loi->loi_ost_idx;
1084
1085                 OBDO_ALLOC(req->rq_oi.oi_oa);
1086                 if (req->rq_oi.oi_oa == NULL) {
1087                         OBD_FREE(req, sizeof(*req));
1088                         GOTO(out_set, rc = -ENOMEM);
1089                 }
1090                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1091                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1092                 lov_set_add_req(req, set);
1093         }
1094         if (!set->set_count)
1095                 GOTO(out_set, rc = -EIO);
1096         *reqset = set;
1097         RETURN(rc);
1098 out_set:
1099         lov_fini_destroy_set(set);
1100         RETURN(rc);
1101 }
1102
1103 int lov_fini_setattr_set(struct lov_request_set *set)
1104 {
1105         int rc = 0;
1106         ENTRY;
1107
1108         if (set == NULL)
1109                 RETURN(0);
1110         LASSERT(set->set_exp);
1111         if (set->set_completes) {
1112                 rc = common_attr_done(set);
1113                 /* FIXME update qos data here */
1114         }
1115
1116         if (atomic_dec_and_test(&set->set_refcount))
1117                 lov_finish_set(set);
1118         RETURN(rc);
1119 }
1120
1121 int lov_update_setattr_set(struct lov_request_set *set,
1122                            struct lov_request *req, int rc)
1123 {
1124         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1125         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1126         ENTRY;
1127
1128         lov_update_set(set, req, rc);
1129
1130         /* grace error on inactive ost */
1131         if (rc && !(lov->lov_tgts[req->rq_idx] && 
1132                     lov->lov_tgts[req->rq_idx]->ltd_active))
1133                 rc = 0;
1134
1135         if (rc == 0) {
1136                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1137                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1138                                 req->rq_oi.oi_oa->o_ctime;
1139                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1140                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1141                                 req->rq_oi.oi_oa->o_mtime;
1142                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1143                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1144                                 req->rq_oi.oi_oa->o_atime;
1145         }
1146
1147         RETURN(rc);
1148 }
1149
1150 /* The callback for osc_setattr_async that finilizes a request info when a
1151  * response is recieved. */
1152 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1153 {
1154         struct lov_request *lovreq;
1155         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1156         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1157 }
1158
1159 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1160                          struct obd_trans_info *oti,
1161                          struct lov_request_set **reqset)
1162 {
1163         struct lov_request_set *set;
1164         struct lov_obd *lov = &exp->exp_obd->u.lov;
1165         int rc = 0, i;
1166         ENTRY;
1167
1168         OBD_ALLOC(set, sizeof(*set));
1169         if (set == NULL)
1170                 RETURN(-ENOMEM);
1171         lov_init_set(set);
1172
1173         set->set_exp = exp;
1174         set->set_oti = oti;
1175         set->set_oi = oinfo;
1176         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1177                 set->set_cookies = oti->oti_logcookies;
1178
1179         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1180                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1181                 struct lov_request *req;
1182
1183                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1184                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1185                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1186                         continue;
1187                 }
1188
1189                 OBD_ALLOC(req, sizeof(*req));
1190                 if (req == NULL)
1191                         GOTO(out_set, rc = -ENOMEM);
1192                 req->rq_stripe = i;
1193                 req->rq_idx = loi->loi_ost_idx;
1194
1195                 OBDO_ALLOC(req->rq_oi.oi_oa);
1196                 if (req->rq_oi.oi_oa == NULL) {
1197                         OBD_FREE(req, sizeof(*req));
1198                         GOTO(out_set, rc = -ENOMEM);
1199                 }
1200                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1201                        sizeof(*req->rq_oi.oi_oa));
1202                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1203                 LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) 
1204                                 || req->rq_oi.oi_oa->o_gr>0);
1205                 req->rq_oi.oi_oa->o_stripe_idx = i;
1206                 req->rq_oi.oi_cb_up = cb_setattr_update;
1207                 req->rq_oi.oi_capa = oinfo->oi_capa;
1208                 req->rq_rqset = set;
1209
1210                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1211                         int off = lov_stripe_offset(oinfo->oi_md,
1212                                                     oinfo->oi_oa->o_size, i,
1213                                                     &req->rq_oi.oi_oa->o_size);
1214
1215                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1216                                 req->rq_oi.oi_oa->o_size--;
1217
1218                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1219                                i, req->rq_oi.oi_oa->o_size,
1220                                oinfo->oi_oa->o_size);
1221                 }
1222                 lov_set_add_req(req, set);
1223         }
1224         if (!set->set_count)
1225                 GOTO(out_set, rc = -EIO);
1226         *reqset = set;
1227         RETURN(rc);
1228 out_set:
1229         lov_fini_setattr_set(set);
1230         RETURN(rc);
1231 }
1232
1233 int lov_fini_punch_set(struct lov_request_set *set)
1234 {
1235         int rc = 0;
1236         ENTRY;
1237
1238         if (set == NULL)
1239                 RETURN(0);
1240         LASSERT(set->set_exp);
1241         if (set->set_completes) {
1242                 rc = -EIO;
1243                 /* FIXME update qos data here */
1244                 if (set->set_success)
1245                         rc = common_attr_done(set);
1246         }
1247
1248         if (atomic_dec_and_test(&set->set_refcount))
1249                 lov_finish_set(set);
1250
1251         RETURN(rc);
1252 }
1253
1254 int lov_update_punch_set(struct lov_request_set *set,
1255                          struct lov_request *req, int rc)
1256 {
1257         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1258         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1259         ENTRY;
1260
1261         lov_update_set(set, req, rc);
1262
1263         /* grace error on inactive ost */
1264         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1265                 rc = 0;
1266
1267         if (rc == 0) {
1268                 lov_stripe_lock(lsm);
1269                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1270                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1271                                 req->rq_oi.oi_oa->o_blocks;
1272                 }
1273
1274                 /* Do we need to update lvb_size here? It needn't because
1275                  * it have been done in ll_truncate(). -jay */
1276                 lov_stripe_unlock(lsm);
1277         }
1278
1279         RETURN(rc);
1280 }
1281
1282 /* The callback for osc_punch that finilizes a request info when a response
1283  * is recieved. */
1284 static int cb_update_punch(struct obd_info *oinfo, int rc)
1285 {
1286         struct lov_request *lovreq;
1287         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1288         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1289 }
1290
1291 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1292                        struct obd_trans_info *oti,
1293                        struct lov_request_set **reqset)
1294 {
1295         struct lov_request_set *set;
1296         struct lov_obd *lov = &exp->exp_obd->u.lov;
1297         int rc = 0, i;
1298         ENTRY;
1299
1300         OBD_ALLOC(set, sizeof(*set));
1301         if (set == NULL)
1302                 RETURN(-ENOMEM);
1303         lov_init_set(set);
1304
1305         set->set_oi = oinfo;
1306         set->set_exp = exp;
1307
1308         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1309                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1310                 struct lov_request *req;
1311                 obd_off rs, re;
1312
1313                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1314                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1315                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1316                         continue;
1317                 }
1318
1319                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1320                                            oinfo->oi_policy.l_extent.start,
1321                                            oinfo->oi_policy.l_extent.end,
1322                                            &rs, &re))
1323                         continue;
1324
1325                 OBD_ALLOC(req, sizeof(*req));
1326                 if (req == NULL)
1327                         GOTO(out_set, rc = -ENOMEM);
1328                 req->rq_stripe = i;
1329                 req->rq_idx = loi->loi_ost_idx;
1330
1331                 OBDO_ALLOC(req->rq_oi.oi_oa);
1332                 if (req->rq_oi.oi_oa == NULL) {
1333                         OBD_FREE(req, sizeof(*req));
1334                         GOTO(out_set, rc = -ENOMEM);
1335                 }
1336                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1337                        sizeof(*req->rq_oi.oi_oa));
1338                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1339                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1340                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1341
1342                 req->rq_oi.oi_oa->o_stripe_idx = i;
1343                 req->rq_oi.oi_cb_up = cb_update_punch;
1344                 req->rq_rqset = set;
1345
1346                 req->rq_oi.oi_policy.l_extent.start = rs;
1347                 req->rq_oi.oi_policy.l_extent.end = re;
1348                 req->rq_oi.oi_policy.l_extent.gid = -1;
1349
1350                 req->rq_oi.oi_capa = oinfo->oi_capa;
1351
1352                 lov_set_add_req(req, set);
1353         }
1354         if (!set->set_count)
1355                 GOTO(out_set, rc = -EIO);
1356         *reqset = set;
1357         RETURN(rc);
1358 out_set:
1359         lov_fini_punch_set(set);
1360         RETURN(rc);
1361 }
1362
1363 int lov_fini_sync_set(struct lov_request_set *set)
1364 {
1365         int rc = 0;
1366         ENTRY;
1367
1368         if (set == NULL)
1369                 RETURN(0);
1370         LASSERT(set->set_exp);
1371         if (set->set_completes) {
1372                 if (!set->set_success)
1373                         rc = -EIO;
1374                 /* FIXME update qos data here */
1375         }
1376
1377         if (atomic_dec_and_test(&set->set_refcount))
1378                 lov_finish_set(set);
1379
1380         RETURN(rc);
1381 }
1382
1383 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1384                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1385                       obd_off start, obd_off end,
1386                       struct lov_request_set **reqset)
1387 {
1388         struct lov_request_set *set;
1389         struct lov_obd *lov = &exp->exp_obd->u.lov;
1390         int rc = 0, i;
1391         ENTRY;
1392
1393         OBD_ALLOC(set, sizeof(*set));
1394         if (set == NULL)
1395                 RETURN(-ENOMEM);
1396         lov_init_set(set);
1397
1398         set->set_exp = exp;
1399         set->set_oi = oinfo;
1400         set->set_oi->oi_md = lsm;
1401         set->set_oi->oi_oa = src_oa;
1402
1403         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1404                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1405                 struct lov_request *req;
1406                 obd_off rs, re;
1407
1408                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1409                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1410                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1411                         continue;
1412                 }
1413
1414                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1415                         continue;
1416
1417                 OBD_ALLOC(req, sizeof(*req));
1418                 if (req == NULL)
1419                         GOTO(out_set, rc = -ENOMEM);
1420                 req->rq_stripe = i;
1421                 req->rq_idx = loi->loi_ost_idx;
1422
1423                 OBDO_ALLOC(req->rq_oi.oi_oa);
1424                 if (req->rq_oi.oi_oa == NULL) {
1425                         OBD_FREE(req, sizeof(*req));
1426                         GOTO(out_set, rc = -ENOMEM);
1427                 }
1428                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1429                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1430                 req->rq_oi.oi_oa->o_stripe_idx = i;
1431
1432                 req->rq_oi.oi_policy.l_extent.start = rs;
1433                 req->rq_oi.oi_policy.l_extent.end = re;
1434                 req->rq_oi.oi_policy.l_extent.gid = -1;
1435
1436                 lov_set_add_req(req, set);
1437         }
1438         if (!set->set_count)
1439                 GOTO(out_set, rc = -EIO);
1440         *reqset = set;
1441         RETURN(rc);
1442 out_set:
1443         lov_fini_sync_set(set);
1444         RETURN(rc);
1445 }
1446
1447 #define LOV_U64_MAX ((__u64)~0ULL)
1448 #define LOV_SUM_MAX(tot, add)                                           \
1449         do {                                                            \
1450                 if ((tot) + (add) < (tot))                              \
1451                         (tot) = LOV_U64_MAX;                            \
1452                 else                                                    \
1453                         (tot) += (add);                                 \
1454         } while(0)
1455
1456 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1457 {
1458         ENTRY;
1459
1460         if (success) {
1461                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1462
1463                 if (osfs->os_files != LOV_U64_MAX)
1464                         do_div(osfs->os_files, expected_stripes);
1465                 if (osfs->os_ffree != LOV_U64_MAX)
1466                         do_div(osfs->os_ffree, expected_stripes);
1467
1468                 spin_lock(&obd->obd_osfs_lock);
1469                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1470                 obd->obd_osfs_age = get_jiffies_64();
1471                 spin_unlock(&obd->obd_osfs_lock);
1472                 RETURN(0);
1473         }
1474
1475         RETURN(-EIO);
1476 }
1477
1478 int lov_fini_statfs_set(struct lov_request_set *set)
1479 {
1480         int rc = 0;
1481         ENTRY;
1482
1483         if (set == NULL)
1484                 RETURN(0);
1485
1486         if (set->set_completes) {
1487                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1488                                      set->set_success);
1489         }
1490
1491         if (atomic_dec_and_test(&set->set_refcount))
1492                 lov_finish_set(set);
1493
1494         RETURN(rc);
1495 }
1496
1497 void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1498                        struct obd_statfs *lov_sfs, int success)
1499 {
1500         int shift = 0, quit = 0;
1501         __u64 tmp;
1502         spin_lock(&obd->obd_osfs_lock);
1503         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1504         obd->obd_osfs_age = get_jiffies_64();
1505         spin_unlock(&obd->obd_osfs_lock);
1506
1507         if (success == 0) {
1508                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1509         } else {
1510                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1511                         /* assume all block sizes are always powers of 2 */
1512                         /* get the bits difference */
1513                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1514                         for (shift = 0; shift <= 64; ++shift) {
1515                                 if (tmp & 1) {
1516                                         if (quit)
1517                                                 break;
1518                                         else
1519                                                 quit = 1;
1520                                         shift = 0;
1521                                 }
1522                                 tmp >>= 1;
1523                         }
1524                 }
1525
1526                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1527                         osfs->os_bsize = lov_sfs->os_bsize;
1528
1529                         osfs->os_bfree  >>= shift;
1530                         osfs->os_bavail >>= shift;
1531                         osfs->os_blocks >>= shift;
1532                 } else if (shift != 0) {
1533                         lov_sfs->os_bfree  >>= shift;
1534                         lov_sfs->os_bavail >>= shift;
1535                         lov_sfs->os_blocks >>= shift;
1536                 }
1537 #ifdef MIN_DF
1538                 /* Sandia requested that df (and so, statfs) only
1539                    returned minimal available space on
1540                    a single OST, so people would be able to
1541                    write this much data guaranteed. */
1542                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1543                         /* Presumably if new bavail is smaller,
1544                            new bfree is bigger as well */
1545                         osfs->os_bfree = lov_sfs->os_bfree;
1546                         osfs->os_bavail = lov_sfs->os_bavail;
1547                 }
1548 #else
1549                 osfs->os_bfree += lov_sfs->os_bfree;
1550                 osfs->os_bavail += lov_sfs->os_bavail;
1551 #endif
1552                 osfs->os_blocks += lov_sfs->os_blocks;
1553                 /* XXX not sure about this one - depends on policy.
1554                  *   - could be minimum if we always stripe on all OBDs
1555                  *     (but that would be wrong for any other policy,
1556                  *     if one of the OBDs has no more objects left)
1557                  *   - could be sum if we stripe whole objects
1558                  *   - could be average, just to give a nice number
1559                  *
1560                  * To give a "reasonable" (if not wholly accurate)
1561                  * number, we divide the total number of free objects
1562                  * by expected stripe count (watch out for overflow).
1563                  */
1564                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1565                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1566         }
1567 }
1568
1569 /* The callback for osc_statfs_async that finilizes a request info when a
1570  * response is recieved. */
1571 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1572 {
1573         struct lov_request *lovreq;
1574         struct obd_statfs *osfs, *lov_sfs;
1575         struct obd_device *obd;
1576         struct lov_obd *lov;
1577         int success;
1578         ENTRY;
1579
1580         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1581         lov = &lovreq->rq_rqset->set_obd->u.lov;
1582         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1583
1584         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1585         lov_sfs = oinfo->oi_osfs;
1586
1587         success = lovreq->rq_rqset->set_success;
1588
1589         /* XXX: the same is done in lov_update_common_set, however
1590            lovset->set_exp is not initialized. */
1591         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1592         if (rc) {
1593                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1594                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1595                         rc = 0;
1596                 RETURN(rc);
1597         }
1598
1599         lov_update_statfs(obd, osfs, lov_sfs, success);
1600         qos_update(lov);
1601
1602         RETURN(0);
1603 }
1604
1605 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1606                         struct lov_request_set **reqset)
1607 {
1608         struct lov_request_set *set;
1609         struct lov_obd *lov = &obd->u.lov;
1610         int rc = 0, i;
1611         ENTRY;
1612
1613         OBD_ALLOC(set, sizeof(*set));
1614         if (set == NULL)
1615                 RETURN(-ENOMEM);
1616         lov_init_set(set);
1617
1618         set->set_obd = obd;
1619         set->set_oi = oinfo;
1620
1621         /* We only get block data from the OBD */
1622         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1623                 struct lov_request *req;
1624
1625                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1626                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1627                         continue;
1628                 }
1629
1630                 OBD_ALLOC(req, sizeof(*req));
1631                 if (req == NULL)
1632                         GOTO(out_set, rc = -ENOMEM);
1633
1634                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1635                 if (req->rq_oi.oi_osfs == NULL) {
1636                         OBD_FREE(req, sizeof(*req));
1637                         GOTO(out_set, rc = -ENOMEM);
1638                 }
1639
1640                 req->rq_idx = i;
1641                 req->rq_oi.oi_cb_up = cb_statfs_update;
1642                 req->rq_rqset = set;
1643
1644                 lov_set_add_req(req, set);
1645         }
1646         if (!set->set_count)
1647                 GOTO(out_set, rc = -EIO);
1648         *reqset = set;
1649         RETURN(rc);
1650 out_set:
1651         lov_fini_statfs_set(set);
1652         RETURN(rc);
1653 }