Whamcloud - gitweb
b=20211
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LOV
41
42 #ifdef __KERNEL__
43 #include <libcfs/libcfs.h>
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre/lustre_idl.h>
51
52 #include "lov_internal.h"
53
54 static void lov_init_set(struct lov_request_set *set)
55 {
56         set->set_count = 0;
57         set->set_completes = 0;
58         set->set_success = 0;
59         set->set_cookies = 0;
60         CFS_INIT_LIST_HEAD(&set->set_list);
61         atomic_set(&set->set_refcount, 1);
62         cfs_waitq_init(&set->set_waitq);
63 }
64
65 static void lov_finish_set(struct lov_request_set *set)
66 {
67         struct list_head *pos, *n;
68         ENTRY;
69
70         LASSERT(set);
71         list_for_each_safe(pos, n, &set->set_list) {
72                 struct lov_request *req = list_entry(pos, struct lov_request,
73                                                      rq_link);
74                 list_del_init(&req->rq_link);
75
76                 if (req->rq_oi.oi_oa)
77                         OBDO_FREE(req->rq_oi.oi_oa);
78                 if (req->rq_oi.oi_md)
79                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
80                 if (req->rq_oi.oi_osfs)
81                         OBD_FREE(req->rq_oi.oi_osfs,
82                                  sizeof(*req->rq_oi.oi_osfs));
83                 OBD_FREE(req, sizeof(*req));
84         }
85
86         if (set->set_pga) {
87                 int len = set->set_oabufs * sizeof(*set->set_pga);
88                 OBD_FREE(set->set_pga, len);
89         }
90         if (set->set_lockh)
91                 lov_llh_put(set->set_lockh);
92
93         OBD_FREE(set, sizeof(*set));
94         EXIT;
95 }
96
97 int lov_finished_set(struct lov_request_set *set)
98 {
99         CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
100                set->set_count);
101         return set->set_completes == set->set_count;
102 }
103
104
105 void lov_update_set(struct lov_request_set *set,
106                     struct lov_request *req, int rc)
107 {
108         req->rq_complete = 1;
109         req->rq_rc = rc;
110
111         set->set_completes++;
112         if (rc == 0)
113                 set->set_success++;
114
115         cfs_waitq_signal(&set->set_waitq);
116 }
117
118 int lov_update_common_set(struct lov_request_set *set,
119                           struct lov_request *req, int rc)
120 {
121         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
122         ENTRY;
123
124         lov_update_set(set, req, rc);
125
126         /* grace error on inactive ost */
127         if (rc && !(lov->lov_tgts[req->rq_idx] &&
128                     lov->lov_tgts[req->rq_idx]->ltd_active))
129                 rc = 0;
130
131         /* FIXME in raid1 regime, should return 0 */
132         RETURN(rc);
133 }
134
135 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
136 {
137         list_add_tail(&req->rq_link, &set->set_list);
138         set->set_count++;
139         req->rq_rqset = set;
140 }
141
142 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
143                                struct lov_oinfo *loi, int flags,
144                                struct ost_lvb *lvb, __u32 mode, int rc);
145
146 static int lov_update_enqueue_lov(struct obd_export *exp,
147                                   struct lustre_handle *lov_lockhp,
148                                   struct lov_oinfo *loi, int flags, int idx,
149                                   __u64 oid, int rc)
150 {
151         struct lov_obd *lov = &exp->exp_obd->u.lov;
152
153         if (rc != ELDLM_OK &&
154             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
155                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
156                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
157                         /* -EUSERS used by OST to report file contention */
158                         if (rc != -EINTR && rc != -EUSERS)
159                                 CERROR("enqueue objid "LPX64" subobj "
160                                        LPX64" on OST idx %d: rc %d\n",
161                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
162                 } else
163                         rc = ELDLM_OK;
164         }
165         return rc;
166 }
167
168 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
169 {
170         struct lov_request_set *set = req->rq_rqset;
171         struct lustre_handle *lov_lockhp;
172         struct obd_info *oi = set->set_oi;
173         struct lov_oinfo *loi;
174         ENTRY;
175
176         LASSERT(oi != NULL);
177
178         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
179         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
180
181         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
182          * and that copy can be arbitrarily out of date.
183          *
184          * The LOV API is due for a serious rewriting anyways, and this
185          * can be addressed then. */
186
187         lov_stripe_lock(oi->oi_md);
188         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
189                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
190         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
191                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
192         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
193                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
194         lov_stripe_unlock(oi->oi_md);
195         lov_update_set(set, req, rc);
196         RETURN(rc);
197 }
198
199 /* The callback for osc_enqueue that updates lov info for every OSC request. */
200 static int cb_update_enqueue(void *cookie, int rc)
201 {
202         struct obd_info *oinfo = cookie;
203         struct ldlm_enqueue_info *einfo;
204         struct lov_request *lovreq;
205
206         lovreq = container_of(oinfo, struct lov_request, rq_oi);
207         einfo = lovreq->rq_rqset->set_ei;
208         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
209 }
210
211 static int enqueue_done(struct lov_request_set *set, __u32 mode)
212 {
213         struct lov_request *req;
214         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
215         int rc = 0;
216         ENTRY;
217
218         /* enqueue/match success, just return */
219         if (set->set_completes && set->set_completes == set->set_success)
220                 RETURN(0);
221
222         /* cancel enqueued/matched locks */
223         list_for_each_entry(req, &set->set_list, rq_link) {
224                 struct lustre_handle *lov_lockhp;
225
226                 if (!req->rq_complete || req->rq_rc)
227                         continue;
228
229                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
230                 LASSERT(lov_lockhp);
231                 if (!lustre_handle_is_used(lov_lockhp))
232                         continue;
233
234                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
235                                 req->rq_oi.oi_md, mode, lov_lockhp);
236                 if (rc && lov->lov_tgts[req->rq_idx] &&
237                     lov->lov_tgts[req->rq_idx]->ltd_active)
238                         CERROR("cancelling obdjid "LPX64" on OST "
239                                "idx %d error: rc = %d\n",
240                                req->rq_oi.oi_md->lsm_object_id,
241                                req->rq_idx, rc);
242         }
243         if (set->set_lockh)
244                 lov_llh_put(set->set_lockh);
245         RETURN(rc);
246 }
247
248 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
249                          struct ptlrpc_request_set *rqset)
250 {
251         int ret = 0;
252         ENTRY;
253
254         if (set == NULL)
255                 RETURN(0);
256         LASSERT(set->set_exp);
257         /* Do enqueue_done only for sync requests and if any request
258          * succeeded. */
259         if (!rqset) {
260                 if (rc)
261                         set->set_completes = 0;
262                 ret = enqueue_done(set, mode);
263         } else if (set->set_lockh)
264                 lov_llh_put(set->set_lockh);
265
266         if (atomic_dec_and_test(&set->set_refcount))
267                 lov_finish_set(set);
268
269         RETURN(rc ? rc : ret);
270 }
271
272 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
273                          struct ldlm_enqueue_info *einfo,
274                          struct lov_request_set **reqset)
275 {
276         struct lov_obd *lov = &exp->exp_obd->u.lov;
277         struct lov_request_set *set;
278         int i, rc = 0;
279         ENTRY;
280
281         OBD_ALLOC(set, sizeof(*set));
282         if (set == NULL)
283                 RETURN(-ENOMEM);
284         lov_init_set(set);
285
286         set->set_exp = exp;
287         set->set_oi = oinfo;
288         set->set_ei = einfo;
289         set->set_lockh = lov_llh_new(oinfo->oi_md);
290         if (set->set_lockh == NULL)
291                 GOTO(out_set, rc = -ENOMEM);
292         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
293
294         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
295                 struct lov_oinfo *loi;
296                 struct lov_request *req;
297                 obd_off start, end;
298
299                 loi = oinfo->oi_md->lsm_oinfo[i];
300                 if (!lov_stripe_intersects(oinfo->oi_md, i,
301                                            oinfo->oi_policy.l_extent.start,
302                                            oinfo->oi_policy.l_extent.end,
303                                            &start, &end))
304                         continue;
305
306                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
307                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
308                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
309                         continue;
310                 }
311
312                 OBD_ALLOC(req, sizeof(*req));
313                 if (req == NULL)
314                         GOTO(out_set, rc = -ENOMEM);
315
316                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
317                         sizeof(struct lov_oinfo *) +
318                         sizeof(struct lov_oinfo);
319                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
320                 if (req->rq_oi.oi_md == NULL) {
321                         OBD_FREE(req, sizeof(*req));
322                         GOTO(out_set, rc = -ENOMEM);
323                 }
324                 req->rq_oi.oi_md->lsm_oinfo[0] =
325                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
326                         sizeof(struct lov_oinfo *);
327
328                 /* Set lov request specific parameters. */
329                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
330                 req->rq_oi.oi_cb_up = cb_update_enqueue;
331                 req->rq_oi.oi_flags = oinfo->oi_flags;
332
333                 LASSERT(req->rq_oi.oi_lockh);
334
335                 req->rq_oi.oi_policy.l_extent.gid =
336                         oinfo->oi_policy.l_extent.gid;
337                 req->rq_oi.oi_policy.l_extent.start = start;
338                 req->rq_oi.oi_policy.l_extent.end = end;
339
340                 req->rq_idx = loi->loi_ost_idx;
341                 req->rq_stripe = i;
342
343                 /* XXX LOV STACKING: submd should be from the subobj */
344                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
345                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
346                 req->rq_oi.oi_md->lsm_stripe_count = 0;
347                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
348                         loi->loi_kms_valid;
349                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
350                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
351
352                 lov_set_add_req(req, set);
353         }
354         if (!set->set_count)
355                 GOTO(out_set, rc = -EIO);
356         *reqset = set;
357         RETURN(0);
358 out_set:
359         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
360         RETURN(rc);
361 }
362
363 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
364                          int rc)
365 {
366         int ret = rc;
367         ENTRY;
368
369         if (rc > 0)
370                 ret = 0;
371         else if (rc == 0)
372                 ret = 1;
373         lov_update_set(set, req, ret);
374         RETURN(rc);
375 }
376
377 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
378 {
379         int rc = 0;
380         ENTRY;
381
382         if (set == NULL)
383                 RETURN(0);
384         LASSERT(set->set_exp);
385         rc = enqueue_done(set, mode);
386         if ((set->set_count == set->set_success) &&
387             (flags & LDLM_FL_TEST_LOCK))
388                 lov_llh_put(set->set_lockh);
389
390         if (atomic_dec_and_test(&set->set_refcount))
391                 lov_finish_set(set);
392
393         RETURN(rc);
394 }
395
396 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
397                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
398                        __u32 mode, struct lustre_handle *lockh,
399                        struct lov_request_set **reqset)
400 {
401         struct lov_obd *lov = &exp->exp_obd->u.lov;
402         struct lov_request_set *set;
403         int i, rc = 0;
404         ENTRY;
405
406         OBD_ALLOC(set, sizeof(*set));
407         if (set == NULL)
408                 RETURN(-ENOMEM);
409         lov_init_set(set);
410
411         set->set_exp = exp;
412         set->set_oi = oinfo;
413         set->set_oi->oi_md = lsm;
414         set->set_lockh = lov_llh_new(lsm);
415         if (set->set_lockh == NULL)
416                 GOTO(out_set, rc = -ENOMEM);
417         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
418
419         for (i = 0; i < lsm->lsm_stripe_count; i++){
420                 struct lov_oinfo *loi;
421                 struct lov_request *req;
422                 obd_off start, end;
423
424                 loi = lsm->lsm_oinfo[i];
425                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
426                                            policy->l_extent.end, &start, &end))
427                         continue;
428
429                 /* FIXME raid1 should grace this error */
430                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
431                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
432                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
433                         GOTO(out_set, rc = -EIO);
434                 }
435
436                 OBD_ALLOC(req, sizeof(*req));
437                 if (req == NULL)
438                         GOTO(out_set, rc = -ENOMEM);
439
440                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
441                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
442                 if (req->rq_oi.oi_md == NULL) {
443                         OBD_FREE(req, sizeof(*req));
444                         GOTO(out_set, rc = -ENOMEM);
445                 }
446
447                 req->rq_oi.oi_policy.l_extent.start = start;
448                 req->rq_oi.oi_policy.l_extent.end = end;
449                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
450
451                 req->rq_idx = loi->loi_ost_idx;
452                 req->rq_stripe = i;
453
454                 /* XXX LOV STACKING: submd should be from the subobj */
455                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
456                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
457                 req->rq_oi.oi_md->lsm_stripe_count = 0;
458
459                 lov_set_add_req(req, set);
460         }
461         if (!set->set_count)
462                 GOTO(out_set, rc = -EIO);
463         *reqset = set;
464         RETURN(rc);
465 out_set:
466         lov_fini_match_set(set, mode, 0);
467         RETURN(rc);
468 }
469
470 int lov_fini_cancel_set(struct lov_request_set *set)
471 {
472         int rc = 0;
473         ENTRY;
474
475         if (set == NULL)
476                 RETURN(0);
477
478         LASSERT(set->set_exp);
479         if (set->set_lockh)
480                 lov_llh_put(set->set_lockh);
481
482         if (atomic_dec_and_test(&set->set_refcount))
483                 lov_finish_set(set);
484
485         RETURN(rc);
486 }
487
488 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
489                         struct lov_stripe_md *lsm, __u32 mode,
490                         struct lustre_handle *lockh,
491                         struct lov_request_set **reqset)
492 {
493         struct lov_request_set *set;
494         int i, rc = 0;
495         ENTRY;
496
497         OBD_ALLOC(set, sizeof(*set));
498         if (set == NULL)
499                 RETURN(-ENOMEM);
500         lov_init_set(set);
501
502         set->set_exp = exp;
503         set->set_oi = oinfo;
504         set->set_oi->oi_md = lsm;
505         set->set_lockh = lov_handle2llh(lockh);
506         if (set->set_lockh == NULL) {
507                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
508                 GOTO(out_set, rc = -EINVAL);
509         }
510         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
511
512         for (i = 0; i < lsm->lsm_stripe_count; i++){
513                 struct lov_request *req;
514                 struct lustre_handle *lov_lockhp;
515                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
516
517                 lov_lockhp = set->set_lockh->llh_handles + i;
518                 if (!lustre_handle_is_used(lov_lockhp)) {
519                         CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
520                                loi->loi_ost_idx, loi->loi_id);
521                         continue;
522                 }
523
524                 OBD_ALLOC(req, sizeof(*req));
525                 if (req == NULL)
526                         GOTO(out_set, rc = -ENOMEM);
527
528                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
529                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
530                 if (req->rq_oi.oi_md == NULL) {
531                         OBD_FREE(req, sizeof(*req));
532                         GOTO(out_set, rc = -ENOMEM);
533                 }
534
535                 req->rq_idx = loi->loi_ost_idx;
536                 req->rq_stripe = i;
537
538                 /* XXX LOV STACKING: submd should be from the subobj */
539                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
540                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
541                 req->rq_oi.oi_md->lsm_stripe_count = 0;
542
543                 lov_set_add_req(req, set);
544         }
545         if (!set->set_count)
546                 GOTO(out_set, rc = -EIO);
547         *reqset = set;
548         RETURN(rc);
549 out_set:
550         lov_fini_cancel_set(set);
551         RETURN(rc);
552 }
553
554 static int create_done(struct obd_export *exp, struct lov_request_set *set,
555                        struct lov_stripe_md **lsmp)
556 {
557         struct lov_obd *lov = &exp->exp_obd->u.lov;
558         struct obd_trans_info *oti = set->set_oti;
559         struct obdo *src_oa = set->set_oi->oi_oa;
560         struct lov_request *req;
561         struct obdo *ret_oa = NULL;
562         int attrset = 0, rc = 0;
563         ENTRY;
564
565         LASSERT(set->set_completes);
566
567         /* try alloc objects on other osts if osc_create fails for
568          * exceptions: RPC failure, ENOSPC, etc */
569         if (set->set_count != set->set_success) {
570                 list_for_each_entry (req, &set->set_list, rq_link) {
571                         if (req->rq_rc == 0)
572                                 continue;
573
574                         set->set_completes--;
575                         req->rq_complete = 0;
576
577                         rc = qos_remedy_create(set, req);
578                         lov_update_create_set(set, req, rc);
579                 }
580         }
581
582         /* no successful creates */
583         if (set->set_success == 0)
584                 GOTO(cleanup, rc);
585
586         if (set->set_count != set->set_success) {
587                 set->set_count = set->set_success;
588                 qos_shrink_lsm(set);
589         }
590
591         OBDO_ALLOC(ret_oa);
592         if (ret_oa == NULL)
593                 GOTO(cleanup, rc = -ENOMEM);
594
595         list_for_each_entry(req, &set->set_list, rq_link) {
596                 if (!req->rq_complete || req->rq_rc)
597                         continue;
598                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
599                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
600                                 req->rq_stripe, &attrset);
601         }
602         if (src_oa->o_valid & OBD_MD_FLSIZE &&
603             ret_oa->o_size != src_oa->o_size) {
604                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
605                        src_oa->o_size, ret_oa->o_size);
606                 LBUG();
607         }
608         ret_oa->o_id = src_oa->o_id;
609         ret_oa->o_gr = src_oa->o_gr;
610         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
611         memcpy(src_oa, ret_oa, sizeof(*src_oa));
612         OBDO_FREE(ret_oa);
613
614         *lsmp = set->set_oi->oi_md;
615         GOTO(done, rc = 0);
616
617 cleanup:
618         list_for_each_entry(req, &set->set_list, rq_link) {
619                 struct obd_export *sub_exp;
620                 int err = 0;
621
622                 if (!req->rq_complete || req->rq_rc)
623                         continue;
624
625                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
626                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
627                                   NULL);
628                 if (err)
629                         CERROR("Failed to uncreate objid "LPX64" subobj "
630                                LPX64" on OST idx %d: rc = %d\n",
631                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
632                                req->rq_idx, rc);
633         }
634         if (*lsmp == NULL)
635                 obd_free_memmd(exp, &set->set_oi->oi_md);
636 done:
637         if (oti && set->set_cookies) {
638                 oti->oti_logcookies = set->set_cookies;
639                 if (!set->set_cookie_sent) {
640                         oti_free_cookies(oti);
641                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
642                 } else {
643                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
644                 }
645         }
646         RETURN(rc);
647 }
648
649 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
650 {
651         int rc = 0;
652         ENTRY;
653
654         if (set == NULL)
655                 RETURN(0);
656         LASSERT(set->set_exp);
657         if (set->set_completes)
658                 rc = create_done(set->set_exp, set, lsmp);
659
660         if (atomic_dec_and_test(&set->set_refcount))
661                 lov_finish_set(set);
662
663         RETURN(rc);
664 }
665
666 int lov_update_create_set(struct lov_request_set *set,
667                           struct lov_request *req, int rc)
668 {
669         struct obd_trans_info *oti = set->set_oti;
670         struct lov_stripe_md *lsm = set->set_oi->oi_md;
671         struct lov_oinfo *loi;
672         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
673         ENTRY;
674
675         req->rq_stripe = set->set_success;
676         loi = lsm->lsm_oinfo[req->rq_stripe];
677
678         if (rc && lov->lov_tgts[req->rq_idx] &&
679             lov->lov_tgts[req->rq_idx]->ltd_active) {
680                 CERROR("error creating fid "LPX64" sub-object"
681                        " on OST idx %d/%d: rc = %d\n",
682                        set->set_oi->oi_oa->o_id, req->rq_idx,
683                        lsm->lsm_stripe_count, rc);
684                 if (rc > 0) {
685                         CERROR("obd_create returned invalid err %d\n", rc);
686                         rc = -EIO;
687                 }
688         }
689         if (rc)
690                 GOTO(out, rc);
691
692         loi->loi_id = req->rq_oi.oi_oa->o_id;
693         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
694         loi->loi_ost_idx = req->rq_idx;
695         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
696                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
697         loi_init(loi);
698
699         if (oti && set->set_cookies)
700                 ++oti->oti_logcookies;
701         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
702                 set->set_cookie_sent++;
703
704 out:
705         lov_update_set(set, req, rc);
706         RETURN(rc);
707 }
708
709 int cb_create_update(void *cookie, int rc)
710 {
711         struct obd_info *oinfo = cookie;
712         struct lov_request *lovreq;
713
714         lovreq = container_of(oinfo, struct lov_request, rq_oi);
715         return lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
716 }
717
718
719 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
720                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
721                         struct obd_trans_info *oti,
722                         struct lov_request_set **reqset)
723 {
724         struct lov_request_set *set;
725         int rc = 0;
726         ENTRY;
727
728         OBD_ALLOC(set, sizeof(*set));
729         if (set == NULL)
730                 RETURN(-ENOMEM);
731         lov_init_set(set);
732
733         set->set_exp = exp;
734         set->set_oi = oinfo;
735         set->set_oi->oi_md = *lsmp;
736         set->set_oi->oi_oa = src_oa;
737         set->set_oti = oti;
738
739         rc = qos_prep_create(exp, set);
740         /* qos_shrink_lsm() may have allocated a new lsm */
741         *lsmp = oinfo->oi_md;
742         if (rc)
743                 lov_fini_create_set(set, lsmp);
744         else
745                 *reqset = set;
746         RETURN(rc);
747 }
748
749 static int common_attr_done(struct lov_request_set *set)
750 {
751         struct list_head *pos;
752         struct lov_request *req;
753         struct obdo *tmp_oa;
754         int rc = 0, attrset = 0;
755         ENTRY;
756
757         LASSERT(set->set_oi != NULL);
758
759         if (set->set_oi->oi_oa == NULL)
760                 RETURN(0);
761
762         if (!set->set_success)
763                 RETURN(-EIO);
764
765         OBDO_ALLOC(tmp_oa);
766         if (tmp_oa == NULL)
767                 GOTO(out, rc = -ENOMEM);
768
769         list_for_each (pos, &set->set_list) {
770                 req = list_entry(pos, struct lov_request, rq_link);
771
772                 if (!req->rq_complete || req->rq_rc)
773                         continue;
774                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
775                         continue;
776                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
777                                 req->rq_oi.oi_oa->o_valid,
778                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
779         }
780         if (!attrset) {
781                 CERROR("No stripes had valid attrs\n");
782                 rc = -EIO;
783         }
784         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
785         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
786 out:
787         if (tmp_oa)
788                 OBDO_FREE(tmp_oa);
789         RETURN(rc);
790
791 }
792
793 static int brw_done(struct lov_request_set *set)
794 {
795         struct lov_stripe_md *lsm = set->set_oi->oi_md;
796         struct lov_oinfo     *loi = NULL;
797         struct list_head *pos;
798         struct lov_request *req;
799         ENTRY;
800
801         list_for_each (pos, &set->set_list) {
802                 req = list_entry(pos, struct lov_request, rq_link);
803
804                 if (!req->rq_complete || req->rq_rc)
805                         continue;
806
807                 loi = lsm->lsm_oinfo[req->rq_stripe];
808
809                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
810                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
811         }
812
813         RETURN(0);
814 }
815
816 int lov_fini_brw_set(struct lov_request_set *set)
817 {
818         int rc = 0;
819         ENTRY;
820
821         if (set == NULL)
822                 RETURN(0);
823         LASSERT(set->set_exp);
824         if (set->set_completes) {
825                 rc = brw_done(set);
826                 /* FIXME update qos data here */
827         }
828         if (atomic_dec_and_test(&set->set_refcount))
829                 lov_finish_set(set);
830
831         RETURN(rc);
832 }
833
834 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
835                      obd_count oa_bufs, struct brw_page *pga,
836                      struct obd_trans_info *oti,
837                      struct lov_request_set **reqset)
838 {
839         struct {
840                 obd_count       index;
841                 obd_count       count;
842                 obd_count       off;
843         } *info = NULL;
844         struct lov_request_set *set;
845         struct lov_obd *lov = &exp->exp_obd->u.lov;
846         int rc = 0, i, shift;
847         ENTRY;
848
849         OBD_ALLOC(set, sizeof(*set));
850         if (set == NULL)
851                 RETURN(-ENOMEM);
852         lov_init_set(set);
853
854         set->set_exp = exp;
855         set->set_oti = oti;
856         set->set_oi = oinfo;
857         set->set_oabufs = oa_bufs;
858         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
859         if (!set->set_pga)
860                 GOTO(out, rc = -ENOMEM);
861
862         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
863         if (!info)
864                 GOTO(out, rc = -ENOMEM);
865
866         /* calculate the page count for each stripe */
867         for (i = 0; i < oa_bufs; i++) {
868                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
869                 info[stripe].count++;
870         }
871
872         /* alloc and initialize lov request */
873         shift = 0;
874         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
875                 struct lov_oinfo *loi = NULL;
876                 struct lov_request *req;
877
878                 if (info[i].count == 0)
879                         continue;
880
881                 loi = oinfo->oi_md->lsm_oinfo[i];
882                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
883                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
884                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
885                         GOTO(out, rc = -EIO);
886                 }
887
888                 OBD_ALLOC(req, sizeof(*req));
889                 if (req == NULL)
890                         GOTO(out, rc = -ENOMEM);
891
892                 OBDO_ALLOC(req->rq_oi.oi_oa);
893                 if (req->rq_oi.oi_oa == NULL) {
894                         OBD_FREE(req, sizeof(*req));
895                         GOTO(out, rc = -ENOMEM);
896                 }
897
898                 if (oinfo->oi_oa) {
899                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
900                                sizeof(*req->rq_oi.oi_oa));
901                 }
902                 req->rq_oi.oi_oa->o_id = loi->loi_id;
903                 req->rq_oi.oi_oa->o_stripe_idx = i;
904
905                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
906                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
907                 if (req->rq_oi.oi_md == NULL) {
908                         OBDO_FREE(req->rq_oi.oi_oa);
909                         OBD_FREE(req, sizeof(*req));
910                         GOTO(out, rc = -ENOMEM);
911                 }
912
913                 req->rq_idx = loi->loi_ost_idx;
914                 req->rq_stripe = i;
915
916                 /* XXX LOV STACKING */
917                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
918                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
919                 req->rq_oabufs = info[i].count;
920                 req->rq_pgaidx = shift;
921                 shift += req->rq_oabufs;
922
923                 /* remember the index for sort brw_page array */
924                 info[i].index = req->rq_pgaidx;
925
926                 req->rq_oi.oi_capa = oinfo->oi_capa;
927
928                 lov_set_add_req(req, set);
929         }
930         if (!set->set_count)
931                 GOTO(out, rc = -EIO);
932
933         /* rotate & sort the brw_page array */
934         for (i = 0; i < oa_bufs; i++) {
935                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
936
937                 shift = info[stripe].index + info[stripe].off;
938                 LASSERT(shift < oa_bufs);
939                 set->set_pga[shift] = pga[i];
940                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
941                                   &set->set_pga[shift].off);
942                 info[stripe].off++;
943         }
944 out:
945         if (info)
946                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
947
948         if (rc == 0)
949                 *reqset = set;
950         else
951                 lov_fini_brw_set(set);
952
953         RETURN(rc);
954 }
955
956 int lov_fini_getattr_set(struct lov_request_set *set)
957 {
958         int rc = 0;
959         ENTRY;
960
961         if (set == NULL)
962                 RETURN(0);
963         LASSERT(set->set_exp);
964         if (set->set_completes)
965                 rc = common_attr_done(set);
966
967         if (atomic_dec_and_test(&set->set_refcount))
968                 lov_finish_set(set);
969
970         RETURN(rc);
971 }
972
973 /* The callback for osc_getattr_async that finilizes a request info when a
974  * response is received. */
975 static int cb_getattr_update(void *cookie, int rc)
976 {
977         struct obd_info *oinfo = cookie;
978         struct lov_request *lovreq;
979         lovreq = container_of(oinfo, struct lov_request, rq_oi);
980         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
981 }
982
983 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
984                          struct lov_request_set **reqset)
985 {
986         struct lov_request_set *set;
987         struct lov_obd *lov = &exp->exp_obd->u.lov;
988         int rc = 0, i;
989         ENTRY;
990
991         OBD_ALLOC(set, sizeof(*set));
992         if (set == NULL)
993                 RETURN(-ENOMEM);
994         lov_init_set(set);
995
996         set->set_exp = exp;
997         set->set_oi = oinfo;
998
999         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1000                 struct lov_oinfo *loi;
1001                 struct lov_request *req;
1002
1003                 loi = oinfo->oi_md->lsm_oinfo[i];
1004                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1005                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1006                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1007                         continue;
1008                 }
1009
1010                 OBD_ALLOC(req, sizeof(*req));
1011                 if (req == NULL)
1012                         GOTO(out_set, rc = -ENOMEM);
1013
1014                 req->rq_stripe = i;
1015                 req->rq_idx = loi->loi_ost_idx;
1016
1017                 OBDO_ALLOC(req->rq_oi.oi_oa);
1018                 if (req->rq_oi.oi_oa == NULL) {
1019                         OBD_FREE(req, sizeof(*req));
1020                         GOTO(out_set, rc = -ENOMEM);
1021                 }
1022                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1023                        sizeof(*req->rq_oi.oi_oa));
1024                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1025                 req->rq_oi.oi_cb_up = cb_getattr_update;
1026                 req->rq_oi.oi_capa = oinfo->oi_capa;
1027
1028                 lov_set_add_req(req, set);
1029         }
1030         if (!set->set_count)
1031                 GOTO(out_set, rc = -EIO);
1032         *reqset = set;
1033         RETURN(rc);
1034 out_set:
1035         lov_fini_getattr_set(set);
1036         RETURN(rc);
1037 }
1038
1039 int lov_fini_destroy_set(struct lov_request_set *set)
1040 {
1041         ENTRY;
1042
1043         if (set == NULL)
1044                 RETURN(0);
1045         LASSERT(set->set_exp);
1046         if (set->set_completes) {
1047                 /* FIXME update qos data here */
1048         }
1049
1050         if (atomic_dec_and_test(&set->set_refcount))
1051                 lov_finish_set(set);
1052
1053         RETURN(0);
1054 }
1055
1056 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1057                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1058                          struct obd_trans_info *oti,
1059                          struct lov_request_set **reqset)
1060 {
1061         struct lov_request_set *set;
1062         struct lov_obd *lov = &exp->exp_obd->u.lov;
1063         int rc = 0, i;
1064         ENTRY;
1065
1066         OBD_ALLOC(set, sizeof(*set));
1067         if (set == NULL)
1068                 RETURN(-ENOMEM);
1069         lov_init_set(set);
1070
1071         set->set_exp = exp;
1072         set->set_oi = oinfo;
1073         set->set_oi->oi_md = lsm;
1074         set->set_oi->oi_oa = src_oa;
1075         set->set_oti = oti;
1076         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1077                 set->set_cookies = oti->oti_logcookies;
1078
1079         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1080                 struct lov_oinfo *loi;
1081                 struct lov_request *req;
1082
1083                 loi = lsm->lsm_oinfo[i];
1084                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1085                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1086                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1087                         continue;
1088                 }
1089
1090                 OBD_ALLOC(req, sizeof(*req));
1091                 if (req == NULL)
1092                         GOTO(out_set, rc = -ENOMEM);
1093
1094                 req->rq_stripe = i;
1095                 req->rq_idx = loi->loi_ost_idx;
1096
1097                 OBDO_ALLOC(req->rq_oi.oi_oa);
1098                 if (req->rq_oi.oi_oa == NULL) {
1099                         OBD_FREE(req, sizeof(*req));
1100                         GOTO(out_set, rc = -ENOMEM);
1101                 }
1102                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1103                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1104                 lov_set_add_req(req, set);
1105         }
1106         if (!set->set_count)
1107                 GOTO(out_set, rc = -EIO);
1108         *reqset = set;
1109         RETURN(rc);
1110 out_set:
1111         lov_fini_destroy_set(set);
1112         RETURN(rc);
1113 }
1114
1115 int lov_fini_setattr_set(struct lov_request_set *set)
1116 {
1117         int rc = 0;
1118         ENTRY;
1119
1120         if (set == NULL)
1121                 RETURN(0);
1122         LASSERT(set->set_exp);
1123         if (set->set_completes) {
1124                 rc = common_attr_done(set);
1125                 /* FIXME update qos data here */
1126         }
1127
1128         if (atomic_dec_and_test(&set->set_refcount))
1129                 lov_finish_set(set);
1130         RETURN(rc);
1131 }
1132
1133 int lov_update_setattr_set(struct lov_request_set *set,
1134                            struct lov_request *req, int rc)
1135 {
1136         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1137         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1138         ENTRY;
1139
1140         lov_update_set(set, req, rc);
1141
1142         /* grace error on inactive ost */
1143         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1144                     lov->lov_tgts[req->rq_idx]->ltd_active))
1145                 rc = 0;
1146
1147         if (rc == 0) {
1148                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1149                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1150                                 req->rq_oi.oi_oa->o_ctime;
1151                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1152                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1153                                 req->rq_oi.oi_oa->o_mtime;
1154                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1155                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1156                                 req->rq_oi.oi_oa->o_atime;
1157         }
1158
1159         RETURN(rc);
1160 }
1161
1162 /* The callback for osc_setattr_async that finilizes a request info when a
1163  * response is received. */
1164 static int cb_setattr_update(void *cookie, int rc)
1165 {
1166         struct obd_info *oinfo = cookie;
1167         struct lov_request *lovreq;
1168         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1169         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1170 }
1171
1172 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1173                          struct obd_trans_info *oti,
1174                          struct lov_request_set **reqset)
1175 {
1176         struct lov_request_set *set;
1177         struct lov_obd *lov = &exp->exp_obd->u.lov;
1178         int rc = 0, i;
1179         ENTRY;
1180
1181         OBD_ALLOC(set, sizeof(*set));
1182         if (set == NULL)
1183                 RETURN(-ENOMEM);
1184         lov_init_set(set);
1185
1186         set->set_exp = exp;
1187         set->set_oti = oti;
1188         set->set_oi = oinfo;
1189         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1190                 set->set_cookies = oti->oti_logcookies;
1191
1192         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1193                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1194                 struct lov_request *req;
1195
1196                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1197                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1198                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1199                         continue;
1200                 }
1201
1202                 OBD_ALLOC(req, sizeof(*req));
1203                 if (req == NULL)
1204                         GOTO(out_set, rc = -ENOMEM);
1205                 req->rq_stripe = i;
1206                 req->rq_idx = loi->loi_ost_idx;
1207
1208                 OBDO_ALLOC(req->rq_oi.oi_oa);
1209                 if (req->rq_oi.oi_oa == NULL) {
1210                         OBD_FREE(req, sizeof(*req));
1211                         GOTO(out_set, rc = -ENOMEM);
1212                 }
1213                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1214                        sizeof(*req->rq_oi.oi_oa));
1215                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1216                 LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) ||
1217                          CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr),
1218                          "req->rq_oi.oi_oa->o_valid="LPX64" "
1219                          "req->rq_oi.oi_oa->o_gr="LPU64"\n",
1220                          req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr);
1221                 req->rq_oi.oi_oa->o_stripe_idx = i;
1222                 req->rq_oi.oi_cb_up = cb_setattr_update;
1223                 req->rq_oi.oi_capa = oinfo->oi_capa;
1224
1225                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1226                         int off = lov_stripe_offset(oinfo->oi_md,
1227                                                     oinfo->oi_oa->o_size, i,
1228                                                     &req->rq_oi.oi_oa->o_size);
1229
1230                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1231                                 req->rq_oi.oi_oa->o_size--;
1232
1233                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1234                                i, req->rq_oi.oi_oa->o_size,
1235                                oinfo->oi_oa->o_size);
1236                 }
1237                 lov_set_add_req(req, set);
1238         }
1239         if (!set->set_count)
1240                 GOTO(out_set, rc = -EIO);
1241         *reqset = set;
1242         RETURN(rc);
1243 out_set:
1244         lov_fini_setattr_set(set);
1245         RETURN(rc);
1246 }
1247
1248 int lov_fini_punch_set(struct lov_request_set *set)
1249 {
1250         int rc = 0;
1251         ENTRY;
1252
1253         if (set == NULL)
1254                 RETURN(0);
1255         LASSERT(set->set_exp);
1256         if (set->set_completes) {
1257                 rc = -EIO;
1258                 /* FIXME update qos data here */
1259                 if (set->set_success)
1260                         rc = common_attr_done(set);
1261         }
1262
1263         if (atomic_dec_and_test(&set->set_refcount))
1264                 lov_finish_set(set);
1265
1266         RETURN(rc);
1267 }
1268
1269 int lov_update_punch_set(struct lov_request_set *set,
1270                          struct lov_request *req, int rc)
1271 {
1272         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1273         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1274         ENTRY;
1275
1276         lov_update_set(set, req, rc);
1277
1278         /* grace error on inactive ost */
1279         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1280                 rc = 0;
1281
1282         if (rc == 0) {
1283                 lov_stripe_lock(lsm);
1284                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1285                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1286                                 req->rq_oi.oi_oa->o_blocks;
1287                 }
1288
1289                 /* Do we need to update lvb_size here? It needn't because
1290                  * it have been done in ll_truncate(). -jay */
1291                 lov_stripe_unlock(lsm);
1292         }
1293
1294         RETURN(rc);
1295 }
1296
1297 /* The callback for osc_punch that finilizes a request info when a response
1298  * is received. */
1299 static int cb_update_punch(void *cookie, int rc)
1300 {
1301         struct obd_info *oinfo = cookie;
1302         struct lov_request *lovreq;
1303         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1304         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1305 }
1306
1307 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1308                        struct obd_trans_info *oti,
1309                        struct lov_request_set **reqset)
1310 {
1311         struct lov_request_set *set;
1312         struct lov_obd *lov = &exp->exp_obd->u.lov;
1313         int rc = 0, i;
1314         ENTRY;
1315
1316         OBD_ALLOC(set, sizeof(*set));
1317         if (set == NULL)
1318                 RETURN(-ENOMEM);
1319         lov_init_set(set);
1320
1321         set->set_oi = oinfo;
1322         set->set_exp = exp;
1323
1324         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1325                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1326                 struct lov_request *req;
1327                 obd_off rs, re;
1328
1329                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1330                                            oinfo->oi_policy.l_extent.start,
1331                                            oinfo->oi_policy.l_extent.end,
1332                                            &rs, &re))
1333                         continue;
1334
1335                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1336                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1337                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1338                         GOTO(out_set, rc = -EIO);
1339                 }
1340
1341                 OBD_ALLOC(req, sizeof(*req));
1342                 if (req == NULL)
1343                         GOTO(out_set, rc = -ENOMEM);
1344                 req->rq_stripe = i;
1345                 req->rq_idx = loi->loi_ost_idx;
1346
1347                 OBDO_ALLOC(req->rq_oi.oi_oa);
1348                 if (req->rq_oi.oi_oa == NULL) {
1349                         OBD_FREE(req, sizeof(*req));
1350                         GOTO(out_set, rc = -ENOMEM);
1351                 }
1352                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1353                        sizeof(*req->rq_oi.oi_oa));
1354                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1355                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1356                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1357
1358                 req->rq_oi.oi_oa->o_stripe_idx = i;
1359                 req->rq_oi.oi_cb_up = cb_update_punch;
1360
1361                 req->rq_oi.oi_policy.l_extent.start = rs;
1362                 req->rq_oi.oi_policy.l_extent.end = re;
1363                 req->rq_oi.oi_policy.l_extent.gid = -1;
1364
1365                 req->rq_oi.oi_capa = oinfo->oi_capa;
1366
1367                 lov_set_add_req(req, set);
1368         }
1369         if (!set->set_count)
1370                 GOTO(out_set, rc = -EIO);
1371         *reqset = set;
1372         RETURN(rc);
1373 out_set:
1374         lov_fini_punch_set(set);
1375         RETURN(rc);
1376 }
1377
1378 int lov_fini_sync_set(struct lov_request_set *set)
1379 {
1380         int rc = 0;
1381         ENTRY;
1382
1383         if (set == NULL)
1384                 RETURN(0);
1385         LASSERT(set->set_exp);
1386         if (set->set_completes) {
1387                 if (!set->set_success)
1388                         rc = -EIO;
1389                 /* FIXME update qos data here */
1390         }
1391
1392         if (atomic_dec_and_test(&set->set_refcount))
1393                 lov_finish_set(set);
1394
1395         RETURN(rc);
1396 }
1397
1398 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1399                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1400                       obd_off start, obd_off end,
1401                       struct lov_request_set **reqset)
1402 {
1403         struct lov_request_set *set;
1404         struct lov_obd *lov = &exp->exp_obd->u.lov;
1405         int rc = 0, i;
1406         ENTRY;
1407
1408         OBD_ALLOC(set, sizeof(*set));
1409         if (set == NULL)
1410                 RETURN(-ENOMEM);
1411         lov_init_set(set);
1412
1413         set->set_exp = exp;
1414         set->set_oi = oinfo;
1415         set->set_oi->oi_md = lsm;
1416         set->set_oi->oi_oa = src_oa;
1417
1418         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1419                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1420                 struct lov_request *req;
1421                 obd_off rs, re;
1422
1423                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1424                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1425                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1426                         continue;
1427                 }
1428
1429                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1430                         continue;
1431
1432                 OBD_ALLOC(req, sizeof(*req));
1433                 if (req == NULL)
1434                         GOTO(out_set, rc = -ENOMEM);
1435                 req->rq_stripe = i;
1436                 req->rq_idx = loi->loi_ost_idx;
1437
1438                 OBDO_ALLOC(req->rq_oi.oi_oa);
1439                 if (req->rq_oi.oi_oa == NULL) {
1440                         OBD_FREE(req, sizeof(*req));
1441                         GOTO(out_set, rc = -ENOMEM);
1442                 }
1443                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1444                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1445                 req->rq_oi.oi_oa->o_stripe_idx = i;
1446
1447                 req->rq_oi.oi_policy.l_extent.start = rs;
1448                 req->rq_oi.oi_policy.l_extent.end = re;
1449                 req->rq_oi.oi_policy.l_extent.gid = -1;
1450
1451                 lov_set_add_req(req, set);
1452         }
1453         if (!set->set_count)
1454                 GOTO(out_set, rc = -EIO);
1455         *reqset = set;
1456         RETURN(rc);
1457 out_set:
1458         lov_fini_sync_set(set);
1459         RETURN(rc);
1460 }
1461
1462 #define LOV_U64_MAX ((__u64)~0ULL)
1463 #define LOV_SUM_MAX(tot, add)                                           \
1464         do {                                                            \
1465                 if ((tot) + (add) < (tot))                              \
1466                         (tot) = LOV_U64_MAX;                            \
1467                 else                                                    \
1468                         (tot) += (add);                                 \
1469         } while(0)
1470
1471 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1472 {
1473         ENTRY;
1474
1475         if (success) {
1476                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1477
1478                 if (osfs->os_files != LOV_U64_MAX)
1479                         do_div(osfs->os_files, expected_stripes);
1480                 if (osfs->os_ffree != LOV_U64_MAX)
1481                         do_div(osfs->os_ffree, expected_stripes);
1482
1483                 spin_lock(&obd->obd_osfs_lock);
1484                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1485                 obd->obd_osfs_age = cfs_time_current_64();
1486                 spin_unlock(&obd->obd_osfs_lock);
1487                 RETURN(0);
1488         }
1489
1490         RETURN(-EIO);
1491 }
1492
1493 int lov_fini_statfs_set(struct lov_request_set *set)
1494 {
1495         int rc = 0;
1496         ENTRY;
1497
1498         if (set == NULL)
1499                 RETURN(0);
1500
1501         if (set->set_completes) {
1502                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1503                                      set->set_success);
1504         }
1505
1506         if (atomic_dec_and_test(&set->set_refcount))
1507                 lov_finish_set(set);
1508
1509         RETURN(rc);
1510 }
1511
1512 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1513                        int success)
1514 {
1515         int shift = 0, quit = 0;
1516         __u64 tmp;
1517
1518         if (success == 0) {
1519                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1520         } else {
1521                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1522                         /* assume all block sizes are always powers of 2 */
1523                         /* get the bits difference */
1524                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1525                         for (shift = 0; shift <= 64; ++shift) {
1526                                 if (tmp & 1) {
1527                                         if (quit)
1528                                                 break;
1529                                         else
1530                                                 quit = 1;
1531                                         shift = 0;
1532                                 }
1533                                 tmp >>= 1;
1534                         }
1535                 }
1536
1537                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1538                         osfs->os_bsize = lov_sfs->os_bsize;
1539
1540                         osfs->os_bfree  >>= shift;
1541                         osfs->os_bavail >>= shift;
1542                         osfs->os_blocks >>= shift;
1543                 } else if (shift != 0) {
1544                         lov_sfs->os_bfree  >>= shift;
1545                         lov_sfs->os_bavail >>= shift;
1546                         lov_sfs->os_blocks >>= shift;
1547                 }
1548 #ifdef MIN_DF
1549                 /* Sandia requested that df (and so, statfs) only
1550                    returned minimal available space on
1551                    a single OST, so people would be able to
1552                    write this much data guaranteed. */
1553                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1554                         /* Presumably if new bavail is smaller,
1555                            new bfree is bigger as well */
1556                         osfs->os_bfree = lov_sfs->os_bfree;
1557                         osfs->os_bavail = lov_sfs->os_bavail;
1558                 }
1559 #else
1560                 osfs->os_bfree += lov_sfs->os_bfree;
1561                 osfs->os_bavail += lov_sfs->os_bavail;
1562 #endif
1563                 osfs->os_blocks += lov_sfs->os_blocks;
1564                 /* XXX not sure about this one - depends on policy.
1565                  *   - could be minimum if we always stripe on all OBDs
1566                  *     (but that would be wrong for any other policy,
1567                  *     if one of the OBDs has no more objects left)
1568                  *   - could be sum if we stripe whole objects
1569                  *   - could be average, just to give a nice number
1570                  *
1571                  * To give a "reasonable" (if not wholly accurate)
1572                  * number, we divide the total number of free objects
1573                  * by expected stripe count (watch out for overflow).
1574                  */
1575                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1576                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1577         }
1578 }
1579
1580 /* The callback for osc_statfs_async that finilizes a request info when a
1581  * response is received. */
1582 static int cb_statfs_update(void *cookie, int rc)
1583 {
1584         struct obd_info *oinfo = cookie;
1585         struct lov_request *lovreq;
1586         struct obd_statfs *osfs, *lov_sfs;
1587         struct obd_device *obd;
1588         struct lov_obd *lov;
1589         int success;
1590         ENTRY;
1591
1592         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1593         lov = &lovreq->rq_rqset->set_obd->u.lov;
1594         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1595
1596         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1597         lov_sfs = oinfo->oi_osfs;
1598
1599         success = lovreq->rq_rqset->set_success;
1600         /* XXX: the same is done in lov_update_common_set, however
1601            lovset->set_exp is not initialized. */
1602         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1603         if (rc) {
1604                 /* XXX ignore error for disconnected ost ? */
1605                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1606                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1607                         rc = 0;
1608                 GOTO(out, rc);
1609         }
1610
1611         spin_lock(&obd->obd_osfs_lock);
1612         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1613         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1614                 obd->obd_osfs_age = cfs_time_current_64();
1615         spin_unlock(&obd->obd_osfs_lock);
1616
1617         lov_update_statfs(osfs, lov_sfs, success);
1618         qos_update(lov);
1619 out:
1620         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1621             lov_finished_set(lovreq->rq_rqset)) {
1622                lov_statfs_interpret(NULL, lovreq->rq_rqset,
1623                                     lovreq->rq_rqset->set_success !=
1624                                                   lovreq->rq_rqset->set_count);
1625                qos_statfs_done(lov);
1626         }
1627
1628         RETURN(0);
1629 }
1630
1631 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1632                         struct lov_request_set **reqset)
1633 {
1634         struct lov_request_set *set;
1635         struct lov_obd *lov = &obd->u.lov;
1636         int rc = 0, i;
1637         ENTRY;
1638
1639         OBD_ALLOC(set, sizeof(*set));
1640         if (set == NULL)
1641                 RETURN(-ENOMEM);
1642         lov_init_set(set);
1643
1644         set->set_obd = obd;
1645         set->set_oi = oinfo;
1646
1647         /* We only get block data from the OBD */
1648         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1649                 struct lov_request *req;
1650
1651                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1652                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1653                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1654                         continue;
1655                 }
1656
1657                 OBD_ALLOC(req, sizeof(*req));
1658                 if (req == NULL)
1659                         GOTO(out_set, rc = -ENOMEM);
1660
1661                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1662                 if (req->rq_oi.oi_osfs == NULL) {
1663                         OBD_FREE(req, sizeof(*req));
1664                         GOTO(out_set, rc = -ENOMEM);
1665                 }
1666
1667                 req->rq_idx = i;
1668                 req->rq_oi.oi_cb_up = cb_statfs_update;
1669                 req->rq_oi.oi_flags = oinfo->oi_flags;
1670
1671                 lov_set_add_req(req, set);
1672         }
1673         if (!set->set_count)
1674                 GOTO(out_set, rc = -EIO);
1675         *reqset = set;
1676         RETURN(rc);
1677 out_set:
1678         lov_fini_statfs_set(set);
1679         RETURN(rc);
1680 }