Whamcloud - gitweb
don't return error if have particaly created objects for file.
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LOV
41
42 #ifdef __KERNEL__
43 #include <libcfs/libcfs.h>
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre/lustre_idl.h>
51
52 #include "lov_internal.h"
53
54 static void lov_init_set(struct lov_request_set *set)
55 {
56         set->set_count = 0;
57         set->set_completes = 0;
58         set->set_success = 0;
59         set->set_cookies = 0;
60         CFS_INIT_LIST_HEAD(&set->set_list);
61         atomic_set(&set->set_refcount, 1);
62 }
63
64 static void lov_finish_set(struct lov_request_set *set)
65 {
66         struct list_head *pos, *n;
67         ENTRY;
68
69         LASSERT(set);
70         list_for_each_safe(pos, n, &set->set_list) {
71                 struct lov_request *req = list_entry(pos, struct lov_request,
72                                                      rq_link);
73                 list_del_init(&req->rq_link);
74
75                 if (req->rq_oi.oi_oa)
76                         OBDO_FREE(req->rq_oi.oi_oa);
77                 if (req->rq_oi.oi_md)
78                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
79                 if (req->rq_oi.oi_osfs)
80                         OBD_FREE(req->rq_oi.oi_osfs,
81                                  sizeof(*req->rq_oi.oi_osfs));
82                 OBD_FREE(req, sizeof(*req));
83         }
84
85         if (set->set_pga) {
86                 int len = set->set_oabufs * sizeof(*set->set_pga);
87                 OBD_FREE(set->set_pga, len);
88         }
89         if (set->set_lockh)
90                 lov_llh_put(set->set_lockh);
91
92         OBD_FREE(set, sizeof(*set));
93         EXIT;
94 }
95
96 void lov_update_set(struct lov_request_set *set,
97                     struct lov_request *req, int rc)
98 {
99         req->rq_complete = 1;
100         req->rq_rc = rc;
101
102         set->set_completes++;
103         if (rc == 0)
104                 set->set_success++;
105 }
106
107 int lov_update_common_set(struct lov_request_set *set,
108                           struct lov_request *req, int rc)
109 {
110         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
111         ENTRY;
112
113         lov_update_set(set, req, rc);
114
115         /* grace error on inactive ost */
116         if (rc && !(lov->lov_tgts[req->rq_idx] &&
117                     lov->lov_tgts[req->rq_idx]->ltd_active))
118                 rc = 0;
119
120         /* FIXME in raid1 regime, should return 0 */
121         RETURN(rc);
122 }
123
124 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
125 {
126         list_add_tail(&req->rq_link, &set->set_list);
127         set->set_count++;
128 }
129
130 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
131                                struct lov_oinfo *loi, int flags,
132                                struct ost_lvb *lvb, __u32 mode, int rc);
133
134 static int lov_update_enqueue_lov(struct obd_export *exp,
135                                   struct lustre_handle *lov_lockhp,
136                                   struct lov_oinfo *loi, int flags, int idx,
137                                   __u64 oid, int rc)
138 {
139         struct lov_obd *lov = &exp->exp_obd->u.lov;
140
141         if (rc != ELDLM_OK &&
142             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
143                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
144                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
145                         /* -EUSERS used by OST to report file contention */
146                         if (rc != -EINTR && rc != -EUSERS)
147                                 CERROR("enqueue objid "LPX64" subobj "
148                                        LPX64" on OST idx %d: rc %d\n",
149                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
150                 } else
151                         rc = ELDLM_OK;
152         }
153         return rc;
154 }
155
156 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
157 {
158         struct lov_request_set *set = req->rq_rqset;
159         struct lustre_handle *lov_lockhp;
160         struct obd_info *oi = set->set_oi;
161         struct lov_oinfo *loi;
162         ENTRY;
163
164         LASSERT(oi != NULL);
165
166         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
167         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
168
169         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
170          * and that copy can be arbitrarily out of date.
171          *
172          * The LOV API is due for a serious rewriting anyways, and this
173          * can be addressed then. */
174
175         lov_stripe_lock(oi->oi_md);
176         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
177                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
178         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
179                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
180         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
181                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
182         lov_stripe_unlock(oi->oi_md);
183         lov_update_set(set, req, rc);
184         RETURN(rc);
185 }
186
187 /* The callback for osc_enqueue that updates lov info for every OSC request. */
188 static int cb_update_enqueue(void *cookie, int rc)
189 {
190         struct obd_info *oinfo = cookie;
191         struct ldlm_enqueue_info *einfo;
192         struct lov_request *lovreq;
193
194         lovreq = container_of(oinfo, struct lov_request, rq_oi);
195         einfo = lovreq->rq_rqset->set_ei;
196         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
197 }
198
199 static int enqueue_done(struct lov_request_set *set, __u32 mode)
200 {
201         struct lov_request *req;
202         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
203         int rc = 0;
204         ENTRY;
205
206         /* enqueue/match success, just return */
207         if (set->set_completes && set->set_completes == set->set_success)
208                 RETURN(0);
209
210         /* cancel enqueued/matched locks */
211         list_for_each_entry(req, &set->set_list, rq_link) {
212                 struct lustre_handle *lov_lockhp;
213
214                 if (!req->rq_complete || req->rq_rc)
215                         continue;
216
217                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
218                 LASSERT(lov_lockhp);
219                 if (!lustre_handle_is_used(lov_lockhp))
220                         continue;
221
222                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
223                                 req->rq_oi.oi_md, mode, lov_lockhp);
224                 if (rc && lov->lov_tgts[req->rq_idx] &&
225                     lov->lov_tgts[req->rq_idx]->ltd_active)
226                         CERROR("cancelling obdjid "LPX64" on OST "
227                                "idx %d error: rc = %d\n",
228                                req->rq_oi.oi_md->lsm_object_id,
229                                req->rq_idx, rc);
230         }
231         if (set->set_lockh)
232                 lov_llh_put(set->set_lockh);
233         RETURN(rc);
234 }
235
236 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
237                          struct ptlrpc_request_set *rqset)
238 {
239         int ret = 0;
240         ENTRY;
241
242         if (set == NULL)
243                 RETURN(0);
244         LASSERT(set->set_exp);
245         /* Do enqueue_done only for sync requests and if any request
246          * succeeded. */
247         if (!rqset) {
248                 if (rc)
249                         set->set_completes = 0;
250                 ret = enqueue_done(set, mode);
251         } else if (set->set_lockh)
252                 lov_llh_put(set->set_lockh);
253
254         if (atomic_dec_and_test(&set->set_refcount))
255                 lov_finish_set(set);
256
257         RETURN(rc ? rc : ret);
258 }
259
260 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
261                          struct ldlm_enqueue_info *einfo,
262                          struct lov_request_set **reqset)
263 {
264         struct lov_obd *lov = &exp->exp_obd->u.lov;
265         struct lov_request_set *set;
266         int i, rc = 0;
267         ENTRY;
268
269         OBD_ALLOC(set, sizeof(*set));
270         if (set == NULL)
271                 RETURN(-ENOMEM);
272         lov_init_set(set);
273
274         set->set_exp = exp;
275         set->set_oi = oinfo;
276         set->set_ei = einfo;
277         set->set_lockh = lov_llh_new(oinfo->oi_md);
278         if (set->set_lockh == NULL)
279                 GOTO(out_set, rc = -ENOMEM);
280         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
281
282         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
283                 struct lov_oinfo *loi;
284                 struct lov_request *req;
285                 obd_off start, end;
286
287                 loi = oinfo->oi_md->lsm_oinfo[i];
288                 if (!lov_stripe_intersects(oinfo->oi_md, i,
289                                            oinfo->oi_policy.l_extent.start,
290                                            oinfo->oi_policy.l_extent.end,
291                                            &start, &end))
292                         continue;
293
294                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
295                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
296                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
297                         continue;
298                 }
299
300                 OBD_ALLOC(req, sizeof(*req));
301                 if (req == NULL)
302                         GOTO(out_set, rc = -ENOMEM);
303
304                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
305                         sizeof(struct lov_oinfo *) +
306                         sizeof(struct lov_oinfo);
307                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
308                 if (req->rq_oi.oi_md == NULL) {
309                         OBD_FREE(req, sizeof(*req));
310                         GOTO(out_set, rc = -ENOMEM);
311                 }
312                 req->rq_oi.oi_md->lsm_oinfo[0] =
313                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
314                         sizeof(struct lov_oinfo *);
315
316
317                 req->rq_rqset = set;
318                 /* Set lov request specific parameters. */
319                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
320                 req->rq_oi.oi_cb_up = cb_update_enqueue;
321                 req->rq_oi.oi_flags = oinfo->oi_flags;
322
323                 LASSERT(req->rq_oi.oi_lockh);
324
325                 req->rq_oi.oi_policy.l_extent.gid =
326                         oinfo->oi_policy.l_extent.gid;
327                 req->rq_oi.oi_policy.l_extent.start = start;
328                 req->rq_oi.oi_policy.l_extent.end = end;
329
330                 req->rq_idx = loi->loi_ost_idx;
331                 req->rq_stripe = i;
332
333                 /* XXX LOV STACKING: submd should be from the subobj */
334                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
335                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
336                 req->rq_oi.oi_md->lsm_stripe_count = 0;
337                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
338                         loi->loi_kms_valid;
339                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
340                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
341
342                 lov_set_add_req(req, set);
343         }
344         if (!set->set_count)
345                 GOTO(out_set, rc = -EIO);
346         *reqset = set;
347         RETURN(0);
348 out_set:
349         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
350         RETURN(rc);
351 }
352
353 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
354                          int rc)
355 {
356         int ret = rc;
357         ENTRY;
358
359         if (rc > 0)
360                 ret = 0;
361         else if (rc == 0)
362                 ret = 1;
363         lov_update_set(set, req, ret);
364         RETURN(rc);
365 }
366
367 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
368 {
369         int rc = 0;
370         ENTRY;
371
372         if (set == NULL)
373                 RETURN(0);
374         LASSERT(set->set_exp);
375         rc = enqueue_done(set, mode);
376         if ((set->set_count == set->set_success) &&
377             (flags & LDLM_FL_TEST_LOCK))
378                 lov_llh_put(set->set_lockh);
379
380         if (atomic_dec_and_test(&set->set_refcount))
381                 lov_finish_set(set);
382
383         RETURN(rc);
384 }
385
386 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
387                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
388                        __u32 mode, struct lustre_handle *lockh,
389                        struct lov_request_set **reqset)
390 {
391         struct lov_obd *lov = &exp->exp_obd->u.lov;
392         struct lov_request_set *set;
393         int i, rc = 0;
394         ENTRY;
395
396         OBD_ALLOC(set, sizeof(*set));
397         if (set == NULL)
398                 RETURN(-ENOMEM);
399         lov_init_set(set);
400
401         set->set_exp = exp;
402         set->set_oi = oinfo;
403         set->set_oi->oi_md = lsm;
404         set->set_lockh = lov_llh_new(lsm);
405         if (set->set_lockh == NULL)
406                 GOTO(out_set, rc = -ENOMEM);
407         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
408
409         for (i = 0; i < lsm->lsm_stripe_count; i++){
410                 struct lov_oinfo *loi;
411                 struct lov_request *req;
412                 obd_off start, end;
413
414                 loi = lsm->lsm_oinfo[i];
415                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
416                                            policy->l_extent.end, &start, &end))
417                         continue;
418
419                 /* FIXME raid1 should grace this error */
420                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
421                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
422                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
423                         GOTO(out_set, rc = -EIO);
424                 }
425
426                 OBD_ALLOC(req, sizeof(*req));
427                 if (req == NULL)
428                         GOTO(out_set, rc = -ENOMEM);
429
430                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
431                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
432                 if (req->rq_oi.oi_md == NULL) {
433                         OBD_FREE(req, sizeof(*req));
434                         GOTO(out_set, rc = -ENOMEM);
435                 }
436
437                 req->rq_oi.oi_policy.l_extent.start = start;
438                 req->rq_oi.oi_policy.l_extent.end = end;
439                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
440
441                 req->rq_idx = loi->loi_ost_idx;
442                 req->rq_stripe = i;
443
444                 /* XXX LOV STACKING: submd should be from the subobj */
445                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
446                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
447                 req->rq_oi.oi_md->lsm_stripe_count = 0;
448
449                 lov_set_add_req(req, set);
450         }
451         if (!set->set_count)
452                 GOTO(out_set, rc = -EIO);
453         *reqset = set;
454         RETURN(rc);
455 out_set:
456         lov_fini_match_set(set, mode, 0);
457         RETURN(rc);
458 }
459
460 int lov_fini_cancel_set(struct lov_request_set *set)
461 {
462         int rc = 0;
463         ENTRY;
464
465         if (set == NULL)
466                 RETURN(0);
467
468         LASSERT(set->set_exp);
469         if (set->set_lockh)
470                 lov_llh_put(set->set_lockh);
471
472         if (atomic_dec_and_test(&set->set_refcount))
473                 lov_finish_set(set);
474
475         RETURN(rc);
476 }
477
478 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
479                         struct lov_stripe_md *lsm, __u32 mode,
480                         struct lustre_handle *lockh,
481                         struct lov_request_set **reqset)
482 {
483         struct lov_request_set *set;
484         int i, rc = 0;
485         ENTRY;
486
487         OBD_ALLOC(set, sizeof(*set));
488         if (set == NULL)
489                 RETURN(-ENOMEM);
490         lov_init_set(set);
491
492         set->set_exp = exp;
493         set->set_oi = oinfo;
494         set->set_oi->oi_md = lsm;
495         set->set_lockh = lov_handle2llh(lockh);
496         if (set->set_lockh == NULL) {
497                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
498                 GOTO(out_set, rc = -EINVAL);
499         }
500         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
501
502         for (i = 0; i < lsm->lsm_stripe_count; i++){
503                 struct lov_request *req;
504                 struct lustre_handle *lov_lockhp;
505                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
506
507                 lov_lockhp = set->set_lockh->llh_handles + i;
508                 if (!lustre_handle_is_used(lov_lockhp)) {
509                         CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
510                                loi->loi_ost_idx, loi->loi_id);
511                         continue;
512                 }
513
514                 OBD_ALLOC(req, sizeof(*req));
515                 if (req == NULL)
516                         GOTO(out_set, rc = -ENOMEM);
517
518                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
519                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
520                 if (req->rq_oi.oi_md == NULL) {
521                         OBD_FREE(req, sizeof(*req));
522                         GOTO(out_set, rc = -ENOMEM);
523                 }
524
525                 req->rq_idx = loi->loi_ost_idx;
526                 req->rq_stripe = i;
527
528                 /* XXX LOV STACKING: submd should be from the subobj */
529                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
530                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
531                 req->rq_oi.oi_md->lsm_stripe_count = 0;
532
533                 lov_set_add_req(req, set);
534         }
535         if (!set->set_count)
536                 GOTO(out_set, rc = -EIO);
537         *reqset = set;
538         RETURN(rc);
539 out_set:
540         lov_fini_cancel_set(set);
541         RETURN(rc);
542 }
543
544 static int create_done(struct obd_export *exp, struct lov_request_set *set,
545                        struct lov_stripe_md **lsmp)
546 {
547         struct lov_obd *lov = &exp->exp_obd->u.lov;
548         struct obd_trans_info *oti = set->set_oti;
549         struct obdo *src_oa = set->set_oi->oi_oa;
550         struct lov_request *req;
551         struct obdo *ret_oa = NULL;
552         int attrset = 0, rc = 0;
553         ENTRY;
554
555         LASSERT(set->set_completes);
556
557         /* try alloc objects on other osts if osc_create fails for
558          * exceptions: RPC failure, ENOSPC, etc */
559         if (set->set_count != set->set_success) {
560                 list_for_each_entry (req, &set->set_list, rq_link) {
561                         if (req->rq_rc == 0)
562                                 continue;
563
564                         set->set_completes--;
565                         req->rq_complete = 0;
566
567                         rc = qos_remedy_create(set, req);
568                         lov_update_create_set(set, req, rc);
569                 }
570         }
571
572         /* no successful creates */
573         if (set->set_success == 0)
574                 GOTO(cleanup, rc);
575
576         if (set->set_count != set->set_success) {
577                 set->set_count = set->set_success;
578                 qos_shrink_lsm(set);
579         }
580
581         OBDO_ALLOC(ret_oa);
582         if (ret_oa == NULL)
583                 GOTO(cleanup, rc = -ENOMEM);
584
585         list_for_each_entry(req, &set->set_list, rq_link) {
586                 if (!req->rq_complete || req->rq_rc)
587                         continue;
588                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
589                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
590                                 req->rq_stripe, &attrset);
591         }
592         if (src_oa->o_valid & OBD_MD_FLSIZE &&
593             ret_oa->o_size != src_oa->o_size) {
594                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
595                        src_oa->o_size, ret_oa->o_size);
596                 LBUG();
597         }
598         ret_oa->o_id = src_oa->o_id;
599         ret_oa->o_gr = src_oa->o_gr;
600         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
601         memcpy(src_oa, ret_oa, sizeof(*src_oa));
602         OBDO_FREE(ret_oa);
603
604         *lsmp = set->set_oi->oi_md;
605         GOTO(done, rc = 0);
606
607 cleanup:
608         list_for_each_entry(req, &set->set_list, rq_link) {
609                 struct obd_export *sub_exp;
610                 int err = 0;
611
612                 if (!req->rq_complete || req->rq_rc)
613                         continue;
614
615                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
616                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
617                                   NULL);
618                 if (err)
619                         CERROR("Failed to uncreate objid "LPX64" subobj "
620                                LPX64" on OST idx %d: rc = %d\n",
621                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
622                                req->rq_idx, rc);
623         }
624         if (*lsmp == NULL)
625                 obd_free_memmd(exp, &set->set_oi->oi_md);
626 done:
627         if (oti && set->set_cookies) {
628                 oti->oti_logcookies = set->set_cookies;
629                 if (!set->set_cookie_sent) {
630                         oti_free_cookies(oti);
631                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
632                 } else {
633                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
634                 }
635         }
636         RETURN(rc);
637 }
638
639 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
640 {
641         int rc = 0;
642         ENTRY;
643
644         if (set == NULL)
645                 RETURN(0);
646         LASSERT(set->set_exp);
647         if (set->set_completes)
648                 rc = create_done(set->set_exp, set, lsmp);
649
650         if (atomic_dec_and_test(&set->set_refcount))
651                 lov_finish_set(set);
652
653         RETURN(rc);
654 }
655
656 int lov_update_create_set(struct lov_request_set *set,
657                           struct lov_request *req, int rc)
658 {
659         struct obd_trans_info *oti = set->set_oti;
660         struct lov_stripe_md *lsm = set->set_oi->oi_md;
661         struct lov_oinfo *loi;
662         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
663         ENTRY;
664
665         req->rq_stripe = set->set_success;
666         loi = lsm->lsm_oinfo[req->rq_stripe];
667
668         if (rc && lov->lov_tgts[req->rq_idx] &&
669             lov->lov_tgts[req->rq_idx]->ltd_active) {
670                 CERROR("error creating fid "LPX64" sub-object"
671                        " on OST idx %d/%d: rc = %d\n",
672                        set->set_oi->oi_oa->o_id, req->rq_idx,
673                        lsm->lsm_stripe_count, rc);
674                 if (rc > 0) {
675                         CERROR("obd_create returned invalid err %d\n", rc);
676                         rc = -EIO;
677                 }
678         }
679         lov_update_set(set, req, rc);
680         if (rc)
681                 RETURN(rc);
682
683         loi->loi_id = req->rq_oi.oi_oa->o_id;
684         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
685         loi->loi_ost_idx = req->rq_idx;
686         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
687                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
688         loi_init(loi);
689
690         if (oti && set->set_cookies)
691                 ++oti->oti_logcookies;
692         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
693                 set->set_cookie_sent++;
694
695         RETURN(0);
696 }
697
698 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
699                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
700                         struct obd_trans_info *oti,
701                         struct lov_request_set **reqset)
702 {
703         struct lov_request_set *set;
704         int rc = 0;
705         ENTRY;
706
707         OBD_ALLOC(set, sizeof(*set));
708         if (set == NULL)
709                 RETURN(-ENOMEM);
710         lov_init_set(set);
711
712         set->set_exp = exp;
713         set->set_oi = oinfo;
714         set->set_oi->oi_md = *lsmp;
715         set->set_oi->oi_oa = src_oa;
716         set->set_oti = oti;
717
718         rc = qos_prep_create(exp, set);
719         /* qos_shrink_lsm() may have allocated a new lsm */
720         *lsmp = oinfo->oi_md;
721         if (rc)
722                 lov_fini_create_set(set, lsmp);
723         else
724                 *reqset = set;
725         RETURN(rc);
726 }
727
728 static int common_attr_done(struct lov_request_set *set)
729 {
730         struct list_head *pos;
731         struct lov_request *req;
732         struct obdo *tmp_oa;
733         int rc = 0, attrset = 0;
734         ENTRY;
735
736         LASSERT(set->set_oi != NULL);
737
738         if (set->set_oi->oi_oa == NULL)
739                 RETURN(0);
740
741         if (!set->set_success)
742                 RETURN(-EIO);
743
744         OBDO_ALLOC(tmp_oa);
745         if (tmp_oa == NULL)
746                 GOTO(out, rc = -ENOMEM);
747
748         list_for_each (pos, &set->set_list) {
749                 req = list_entry(pos, struct lov_request, rq_link);
750
751                 if (!req->rq_complete || req->rq_rc)
752                         continue;
753                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
754                         continue;
755                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
756                                 req->rq_oi.oi_oa->o_valid,
757                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
758         }
759         if (!attrset) {
760                 CERROR("No stripes had valid attrs\n");
761                 rc = -EIO;
762         }
763         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
764         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
765 out:
766         if (tmp_oa)
767                 OBDO_FREE(tmp_oa);
768         RETURN(rc);
769
770 }
771
772 static int brw_done(struct lov_request_set *set)
773 {
774         struct lov_stripe_md *lsm = set->set_oi->oi_md;
775         struct lov_oinfo     *loi = NULL;
776         struct list_head *pos;
777         struct lov_request *req;
778         ENTRY;
779
780         list_for_each (pos, &set->set_list) {
781                 req = list_entry(pos, struct lov_request, rq_link);
782
783                 if (!req->rq_complete || req->rq_rc)
784                         continue;
785
786                 loi = lsm->lsm_oinfo[req->rq_stripe];
787
788                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
789                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
790         }
791
792         RETURN(0);
793 }
794
795 int lov_fini_brw_set(struct lov_request_set *set)
796 {
797         int rc = 0;
798         ENTRY;
799
800         if (set == NULL)
801                 RETURN(0);
802         LASSERT(set->set_exp);
803         if (set->set_completes) {
804                 rc = brw_done(set);
805                 /* FIXME update qos data here */
806         }
807         if (atomic_dec_and_test(&set->set_refcount))
808                 lov_finish_set(set);
809
810         RETURN(rc);
811 }
812
813 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
814                      obd_count oa_bufs, struct brw_page *pga,
815                      struct obd_trans_info *oti,
816                      struct lov_request_set **reqset)
817 {
818         struct {
819                 obd_count       index;
820                 obd_count       count;
821                 obd_count       off;
822         } *info = NULL;
823         struct lov_request_set *set;
824         struct lov_obd *lov = &exp->exp_obd->u.lov;
825         int rc = 0, i, shift;
826         ENTRY;
827
828         OBD_ALLOC(set, sizeof(*set));
829         if (set == NULL)
830                 RETURN(-ENOMEM);
831         lov_init_set(set);
832
833         set->set_exp = exp;
834         set->set_oti = oti;
835         set->set_oi = oinfo;
836         set->set_oabufs = oa_bufs;
837         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
838         if (!set->set_pga)
839                 GOTO(out, rc = -ENOMEM);
840
841         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
842         if (!info)
843                 GOTO(out, rc = -ENOMEM);
844
845         /* calculate the page count for each stripe */
846         for (i = 0; i < oa_bufs; i++) {
847                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
848                 info[stripe].count++;
849         }
850
851         /* alloc and initialize lov request */
852         shift = 0;
853         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
854                 struct lov_oinfo *loi = NULL;
855                 struct lov_request *req;
856
857                 if (info[i].count == 0)
858                         continue;
859
860                 loi = oinfo->oi_md->lsm_oinfo[i];
861                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
862                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
863                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
864                         GOTO(out, rc = -EIO);
865                 }
866
867                 OBD_ALLOC(req, sizeof(*req));
868                 if (req == NULL)
869                         GOTO(out, rc = -ENOMEM);
870
871                 OBDO_ALLOC(req->rq_oi.oi_oa);
872                 if (req->rq_oi.oi_oa == NULL) {
873                         OBD_FREE(req, sizeof(*req));
874                         GOTO(out, rc = -ENOMEM);
875                 }
876
877                 if (oinfo->oi_oa) {
878                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
879                                sizeof(*req->rq_oi.oi_oa));
880                 }
881                 req->rq_oi.oi_oa->o_id = loi->loi_id;
882                 req->rq_oi.oi_oa->o_stripe_idx = i;
883
884                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
885                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
886                 if (req->rq_oi.oi_md == NULL) {
887                         OBDO_FREE(req->rq_oi.oi_oa);
888                         OBD_FREE(req, sizeof(*req));
889                         GOTO(out, rc = -ENOMEM);
890                 }
891
892                 req->rq_idx = loi->loi_ost_idx;
893                 req->rq_stripe = i;
894
895                 /* XXX LOV STACKING */
896                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
897                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
898                 req->rq_oabufs = info[i].count;
899                 req->rq_pgaidx = shift;
900                 shift += req->rq_oabufs;
901
902                 /* remember the index for sort brw_page array */
903                 info[i].index = req->rq_pgaidx;
904
905                 req->rq_oi.oi_capa = oinfo->oi_capa;
906
907                 lov_set_add_req(req, set);
908         }
909         if (!set->set_count)
910                 GOTO(out, rc = -EIO);
911
912         /* rotate & sort the brw_page array */
913         for (i = 0; i < oa_bufs; i++) {
914                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
915
916                 shift = info[stripe].index + info[stripe].off;
917                 LASSERT(shift < oa_bufs);
918                 set->set_pga[shift] = pga[i];
919                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
920                                   &set->set_pga[shift].off);
921                 info[stripe].off++;
922         }
923 out:
924         if (info)
925                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
926
927         if (rc == 0)
928                 *reqset = set;
929         else
930                 lov_fini_brw_set(set);
931
932         RETURN(rc);
933 }
934
935 int lov_fini_getattr_set(struct lov_request_set *set)
936 {
937         int rc = 0;
938         ENTRY;
939
940         if (set == NULL)
941                 RETURN(0);
942         LASSERT(set->set_exp);
943         if (set->set_completes)
944                 rc = common_attr_done(set);
945
946         if (atomic_dec_and_test(&set->set_refcount))
947                 lov_finish_set(set);
948
949         RETURN(rc);
950 }
951
952 /* The callback for osc_getattr_async that finilizes a request info when a
953  * response is received. */
954 static int cb_getattr_update(void *cookie, int rc)
955 {
956         struct obd_info *oinfo = cookie;
957         struct lov_request *lovreq;
958         lovreq = container_of(oinfo, struct lov_request, rq_oi);
959         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
960 }
961
962 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
963                          struct lov_request_set **reqset)
964 {
965         struct lov_request_set *set;
966         struct lov_obd *lov = &exp->exp_obd->u.lov;
967         int rc = 0, i;
968         ENTRY;
969
970         OBD_ALLOC(set, sizeof(*set));
971         if (set == NULL)
972                 RETURN(-ENOMEM);
973         lov_init_set(set);
974
975         set->set_exp = exp;
976         set->set_oi = oinfo;
977
978         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
979                 struct lov_oinfo *loi;
980                 struct lov_request *req;
981
982                 loi = oinfo->oi_md->lsm_oinfo[i];
983                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
984                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
985                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
986                         continue;
987                 }
988
989                 OBD_ALLOC(req, sizeof(*req));
990                 if (req == NULL)
991                         GOTO(out_set, rc = -ENOMEM);
992
993                 req->rq_stripe = i;
994                 req->rq_idx = loi->loi_ost_idx;
995
996                 OBDO_ALLOC(req->rq_oi.oi_oa);
997                 if (req->rq_oi.oi_oa == NULL) {
998                         OBD_FREE(req, sizeof(*req));
999                         GOTO(out_set, rc = -ENOMEM);
1000                 }
1001                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1002                        sizeof(*req->rq_oi.oi_oa));
1003                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1004                 req->rq_oi.oi_cb_up = cb_getattr_update;
1005                 req->rq_oi.oi_capa = oinfo->oi_capa;
1006                 req->rq_rqset = set;
1007
1008                 lov_set_add_req(req, set);
1009         }
1010         if (!set->set_count)
1011                 GOTO(out_set, rc = -EIO);
1012         *reqset = set;
1013         RETURN(rc);
1014 out_set:
1015         lov_fini_getattr_set(set);
1016         RETURN(rc);
1017 }
1018
1019 int lov_fini_destroy_set(struct lov_request_set *set)
1020 {
1021         ENTRY;
1022
1023         if (set == NULL)
1024                 RETURN(0);
1025         LASSERT(set->set_exp);
1026         if (set->set_completes) {
1027                 /* FIXME update qos data here */
1028         }
1029
1030         if (atomic_dec_and_test(&set->set_refcount))
1031                 lov_finish_set(set);
1032
1033         RETURN(0);
1034 }
1035
1036 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1037                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1038                          struct obd_trans_info *oti,
1039                          struct lov_request_set **reqset)
1040 {
1041         struct lov_request_set *set;
1042         struct lov_obd *lov = &exp->exp_obd->u.lov;
1043         int rc = 0, i;
1044         ENTRY;
1045
1046         OBD_ALLOC(set, sizeof(*set));
1047         if (set == NULL)
1048                 RETURN(-ENOMEM);
1049         lov_init_set(set);
1050
1051         set->set_exp = exp;
1052         set->set_oi = oinfo;
1053         set->set_oi->oi_md = lsm;
1054         set->set_oi->oi_oa = src_oa;
1055         set->set_oti = oti;
1056         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1057                 set->set_cookies = oti->oti_logcookies;
1058
1059         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1060                 struct lov_oinfo *loi;
1061                 struct lov_request *req;
1062
1063                 loi = lsm->lsm_oinfo[i];
1064                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1065                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1066                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1067                         continue;
1068                 }
1069
1070                 OBD_ALLOC(req, sizeof(*req));
1071                 if (req == NULL)
1072                         GOTO(out_set, rc = -ENOMEM);
1073
1074                 req->rq_stripe = i;
1075                 req->rq_idx = loi->loi_ost_idx;
1076
1077                 OBDO_ALLOC(req->rq_oi.oi_oa);
1078                 if (req->rq_oi.oi_oa == NULL) {
1079                         OBD_FREE(req, sizeof(*req));
1080                         GOTO(out_set, rc = -ENOMEM);
1081                 }
1082                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1083                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1084                 lov_set_add_req(req, set);
1085         }
1086         if (!set->set_count)
1087                 GOTO(out_set, rc = -EIO);
1088         *reqset = set;
1089         RETURN(rc);
1090 out_set:
1091         lov_fini_destroy_set(set);
1092         RETURN(rc);
1093 }
1094
1095 int lov_fini_setattr_set(struct lov_request_set *set)
1096 {
1097         int rc = 0;
1098         ENTRY;
1099
1100         if (set == NULL)
1101                 RETURN(0);
1102         LASSERT(set->set_exp);
1103         if (set->set_completes) {
1104                 rc = common_attr_done(set);
1105                 /* FIXME update qos data here */
1106         }
1107
1108         if (atomic_dec_and_test(&set->set_refcount))
1109                 lov_finish_set(set);
1110         RETURN(rc);
1111 }
1112
1113 int lov_update_setattr_set(struct lov_request_set *set,
1114                            struct lov_request *req, int rc)
1115 {
1116         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1117         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1118         ENTRY;
1119
1120         lov_update_set(set, req, rc);
1121
1122         /* grace error on inactive ost */
1123         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1124                     lov->lov_tgts[req->rq_idx]->ltd_active))
1125                 rc = 0;
1126
1127         if (rc == 0) {
1128                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1129                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1130                                 req->rq_oi.oi_oa->o_ctime;
1131                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1132                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1133                                 req->rq_oi.oi_oa->o_mtime;
1134                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1135                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1136                                 req->rq_oi.oi_oa->o_atime;
1137         }
1138
1139         RETURN(rc);
1140 }
1141
1142 /* The callback for osc_setattr_async that finilizes a request info when a
1143  * response is received. */
1144 static int cb_setattr_update(void *cookie, int rc)
1145 {
1146         struct obd_info *oinfo = cookie;
1147         struct lov_request *lovreq;
1148         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1149         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1150 }
1151
1152 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1153                          struct obd_trans_info *oti,
1154                          struct lov_request_set **reqset)
1155 {
1156         struct lov_request_set *set;
1157         struct lov_obd *lov = &exp->exp_obd->u.lov;
1158         int rc = 0, i;
1159         ENTRY;
1160
1161         OBD_ALLOC(set, sizeof(*set));
1162         if (set == NULL)
1163                 RETURN(-ENOMEM);
1164         lov_init_set(set);
1165
1166         set->set_exp = exp;
1167         set->set_oti = oti;
1168         set->set_oi = oinfo;
1169         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1170                 set->set_cookies = oti->oti_logcookies;
1171
1172         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1173                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1174                 struct lov_request *req;
1175
1176                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1177                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1178                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1179                         continue;
1180                 }
1181
1182                 OBD_ALLOC(req, sizeof(*req));
1183                 if (req == NULL)
1184                         GOTO(out_set, rc = -ENOMEM);
1185                 req->rq_stripe = i;
1186                 req->rq_idx = loi->loi_ost_idx;
1187
1188                 OBDO_ALLOC(req->rq_oi.oi_oa);
1189                 if (req->rq_oi.oi_oa == NULL) {
1190                         OBD_FREE(req, sizeof(*req));
1191                         GOTO(out_set, rc = -ENOMEM);
1192                 }
1193                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1194                        sizeof(*req->rq_oi.oi_oa));
1195                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1196                 LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) ||
1197                          CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr),
1198                          "req->rq_oi.oi_oa->o_valid="LPX64" "
1199                          "req->rq_oi.oi_oa->o_gr="LPU64"\n",
1200                          req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr);
1201                 req->rq_oi.oi_oa->o_stripe_idx = i;
1202                 req->rq_oi.oi_cb_up = cb_setattr_update;
1203                 req->rq_oi.oi_capa = oinfo->oi_capa;
1204                 req->rq_rqset = set;
1205
1206                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1207                         int off = lov_stripe_offset(oinfo->oi_md,
1208                                                     oinfo->oi_oa->o_size, i,
1209                                                     &req->rq_oi.oi_oa->o_size);
1210
1211                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1212                                 req->rq_oi.oi_oa->o_size--;
1213
1214                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1215                                i, req->rq_oi.oi_oa->o_size,
1216                                oinfo->oi_oa->o_size);
1217                 }
1218                 lov_set_add_req(req, set);
1219         }
1220         if (!set->set_count)
1221                 GOTO(out_set, rc = -EIO);
1222         *reqset = set;
1223         RETURN(rc);
1224 out_set:
1225         lov_fini_setattr_set(set);
1226         RETURN(rc);
1227 }
1228
1229 int lov_fini_punch_set(struct lov_request_set *set)
1230 {
1231         int rc = 0;
1232         ENTRY;
1233
1234         if (set == NULL)
1235                 RETURN(0);
1236         LASSERT(set->set_exp);
1237         if (set->set_completes) {
1238                 rc = -EIO;
1239                 /* FIXME update qos data here */
1240                 if (set->set_success)
1241                         rc = common_attr_done(set);
1242         }
1243
1244         if (atomic_dec_and_test(&set->set_refcount))
1245                 lov_finish_set(set);
1246
1247         RETURN(rc);
1248 }
1249
1250 int lov_update_punch_set(struct lov_request_set *set,
1251                          struct lov_request *req, int rc)
1252 {
1253         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1254         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1255         ENTRY;
1256
1257         lov_update_set(set, req, rc);
1258
1259         /* grace error on inactive ost */
1260         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1261                 rc = 0;
1262
1263         if (rc == 0) {
1264                 lov_stripe_lock(lsm);
1265                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1266                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1267                                 req->rq_oi.oi_oa->o_blocks;
1268                 }
1269
1270                 /* Do we need to update lvb_size here? It needn't because
1271                  * it have been done in ll_truncate(). -jay */
1272                 lov_stripe_unlock(lsm);
1273         }
1274
1275         RETURN(rc);
1276 }
1277
1278 /* The callback for osc_punch that finilizes a request info when a response
1279  * is received. */
1280 static int cb_update_punch(void *cookie, int rc)
1281 {
1282         struct obd_info *oinfo = cookie;
1283         struct lov_request *lovreq;
1284         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1285         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1286 }
1287
1288 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1289                        struct obd_trans_info *oti,
1290                        struct lov_request_set **reqset)
1291 {
1292         struct lov_request_set *set;
1293         struct lov_obd *lov = &exp->exp_obd->u.lov;
1294         int rc = 0, i;
1295         ENTRY;
1296
1297         OBD_ALLOC(set, sizeof(*set));
1298         if (set == NULL)
1299                 RETURN(-ENOMEM);
1300         lov_init_set(set);
1301
1302         set->set_oi = oinfo;
1303         set->set_exp = exp;
1304
1305         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1306                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1307                 struct lov_request *req;
1308                 obd_off rs, re;
1309
1310                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1311                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1312                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1313                         continue;
1314                 }
1315
1316                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1317                                            oinfo->oi_policy.l_extent.start,
1318                                            oinfo->oi_policy.l_extent.end,
1319                                            &rs, &re))
1320                         continue;
1321
1322                 OBD_ALLOC(req, sizeof(*req));
1323                 if (req == NULL)
1324                         GOTO(out_set, rc = -ENOMEM);
1325                 req->rq_stripe = i;
1326                 req->rq_idx = loi->loi_ost_idx;
1327
1328                 OBDO_ALLOC(req->rq_oi.oi_oa);
1329                 if (req->rq_oi.oi_oa == NULL) {
1330                         OBD_FREE(req, sizeof(*req));
1331                         GOTO(out_set, rc = -ENOMEM);
1332                 }
1333                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1334                        sizeof(*req->rq_oi.oi_oa));
1335                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1336                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1337                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1338
1339                 req->rq_oi.oi_oa->o_stripe_idx = i;
1340                 req->rq_oi.oi_cb_up = cb_update_punch;
1341                 req->rq_rqset = set;
1342
1343                 req->rq_oi.oi_policy.l_extent.start = rs;
1344                 req->rq_oi.oi_policy.l_extent.end = re;
1345                 req->rq_oi.oi_policy.l_extent.gid = -1;
1346
1347                 req->rq_oi.oi_capa = oinfo->oi_capa;
1348
1349                 lov_set_add_req(req, set);
1350         }
1351         if (!set->set_count)
1352                 GOTO(out_set, rc = -EIO);
1353         *reqset = set;
1354         RETURN(rc);
1355 out_set:
1356         lov_fini_punch_set(set);
1357         RETURN(rc);
1358 }
1359
1360 int lov_fini_sync_set(struct lov_request_set *set)
1361 {
1362         int rc = 0;
1363         ENTRY;
1364
1365         if (set == NULL)
1366                 RETURN(0);
1367         LASSERT(set->set_exp);
1368         if (set->set_completes) {
1369                 if (!set->set_success)
1370                         rc = -EIO;
1371                 /* FIXME update qos data here */
1372         }
1373
1374         if (atomic_dec_and_test(&set->set_refcount))
1375                 lov_finish_set(set);
1376
1377         RETURN(rc);
1378 }
1379
1380 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1381                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1382                       obd_off start, obd_off end,
1383                       struct lov_request_set **reqset)
1384 {
1385         struct lov_request_set *set;
1386         struct lov_obd *lov = &exp->exp_obd->u.lov;
1387         int rc = 0, i;
1388         ENTRY;
1389
1390         OBD_ALLOC(set, sizeof(*set));
1391         if (set == NULL)
1392                 RETURN(-ENOMEM);
1393         lov_init_set(set);
1394
1395         set->set_exp = exp;
1396         set->set_oi = oinfo;
1397         set->set_oi->oi_md = lsm;
1398         set->set_oi->oi_oa = src_oa;
1399
1400         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1401                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1402                 struct lov_request *req;
1403                 obd_off rs, re;
1404
1405                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1406                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1407                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1408                         continue;
1409                 }
1410
1411                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1412                         continue;
1413
1414                 OBD_ALLOC(req, sizeof(*req));
1415                 if (req == NULL)
1416                         GOTO(out_set, rc = -ENOMEM);
1417                 req->rq_stripe = i;
1418                 req->rq_idx = loi->loi_ost_idx;
1419
1420                 OBDO_ALLOC(req->rq_oi.oi_oa);
1421                 if (req->rq_oi.oi_oa == NULL) {
1422                         OBD_FREE(req, sizeof(*req));
1423                         GOTO(out_set, rc = -ENOMEM);
1424                 }
1425                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1426                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1427                 req->rq_oi.oi_oa->o_stripe_idx = i;
1428
1429                 req->rq_oi.oi_policy.l_extent.start = rs;
1430                 req->rq_oi.oi_policy.l_extent.end = re;
1431                 req->rq_oi.oi_policy.l_extent.gid = -1;
1432                 req->rq_rqset = set;
1433
1434                 lov_set_add_req(req, set);
1435         }
1436         if (!set->set_count)
1437                 GOTO(out_set, rc = -EIO);
1438         *reqset = set;
1439         RETURN(rc);
1440 out_set:
1441         lov_fini_sync_set(set);
1442         RETURN(rc);
1443 }
1444
1445 #define LOV_U64_MAX ((__u64)~0ULL)
1446 #define LOV_SUM_MAX(tot, add)                                           \
1447         do {                                                            \
1448                 if ((tot) + (add) < (tot))                              \
1449                         (tot) = LOV_U64_MAX;                            \
1450                 else                                                    \
1451                         (tot) += (add);                                 \
1452         } while(0)
1453
1454 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1455 {
1456         ENTRY;
1457
1458         if (success) {
1459                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1460
1461                 if (osfs->os_files != LOV_U64_MAX)
1462                         do_div(osfs->os_files, expected_stripes);
1463                 if (osfs->os_ffree != LOV_U64_MAX)
1464                         do_div(osfs->os_ffree, expected_stripes);
1465
1466                 spin_lock(&obd->obd_osfs_lock);
1467                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1468                 obd->obd_osfs_age = cfs_time_current_64();
1469                 spin_unlock(&obd->obd_osfs_lock);
1470                 RETURN(0);
1471         }
1472
1473         RETURN(-EIO);
1474 }
1475
1476 int lov_fini_statfs_set(struct lov_request_set *set)
1477 {
1478         int rc = 0;
1479         ENTRY;
1480
1481         if (set == NULL)
1482                 RETURN(0);
1483
1484         if (set->set_completes) {
1485                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1486                                      set->set_success);
1487         }
1488
1489         if (atomic_dec_and_test(&set->set_refcount))
1490                 lov_finish_set(set);
1491
1492         RETURN(rc);
1493 }
1494
1495 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1496                        int success)
1497 {
1498         int shift = 0, quit = 0;
1499         __u64 tmp;
1500
1501         if (success == 0) {
1502                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1503         } else {
1504                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1505                         /* assume all block sizes are always powers of 2 */
1506                         /* get the bits difference */
1507                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1508                         for (shift = 0; shift <= 64; ++shift) {
1509                                 if (tmp & 1) {
1510                                         if (quit)
1511                                                 break;
1512                                         else
1513                                                 quit = 1;
1514                                         shift = 0;
1515                                 }
1516                                 tmp >>= 1;
1517                         }
1518                 }
1519
1520                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1521                         osfs->os_bsize = lov_sfs->os_bsize;
1522
1523                         osfs->os_bfree  >>= shift;
1524                         osfs->os_bavail >>= shift;
1525                         osfs->os_blocks >>= shift;
1526                 } else if (shift != 0) {
1527                         lov_sfs->os_bfree  >>= shift;
1528                         lov_sfs->os_bavail >>= shift;
1529                         lov_sfs->os_blocks >>= shift;
1530                 }
1531 #ifdef MIN_DF
1532                 /* Sandia requested that df (and so, statfs) only
1533                    returned minimal available space on
1534                    a single OST, so people would be able to
1535                    write this much data guaranteed. */
1536                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1537                         /* Presumably if new bavail is smaller,
1538                            new bfree is bigger as well */
1539                         osfs->os_bfree = lov_sfs->os_bfree;
1540                         osfs->os_bavail = lov_sfs->os_bavail;
1541                 }
1542 #else
1543                 osfs->os_bfree += lov_sfs->os_bfree;
1544                 osfs->os_bavail += lov_sfs->os_bavail;
1545 #endif
1546                 osfs->os_blocks += lov_sfs->os_blocks;
1547                 /* XXX not sure about this one - depends on policy.
1548                  *   - could be minimum if we always stripe on all OBDs
1549                  *     (but that would be wrong for any other policy,
1550                  *     if one of the OBDs has no more objects left)
1551                  *   - could be sum if we stripe whole objects
1552                  *   - could be average, just to give a nice number
1553                  *
1554                  * To give a "reasonable" (if not wholly accurate)
1555                  * number, we divide the total number of free objects
1556                  * by expected stripe count (watch out for overflow).
1557                  */
1558                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1559                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1560         }
1561 }
1562
1563 /* The callback for osc_statfs_async that finilizes a request info when a
1564  * response is received. */
1565 static int cb_statfs_update(void *cookie, int rc)
1566 {
1567         struct obd_info *oinfo = cookie;
1568         struct lov_request *lovreq;
1569         struct obd_statfs *osfs, *lov_sfs;
1570         struct obd_device *obd;
1571         struct lov_obd *lov;
1572         int success;
1573         ENTRY;
1574
1575         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1576         lov = &lovreq->rq_rqset->set_obd->u.lov;
1577         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1578
1579         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1580         lov_sfs = oinfo->oi_osfs;
1581
1582         success = lovreq->rq_rqset->set_success;
1583         /* XXX: the same is done in lov_update_common_set, however
1584            lovset->set_exp is not initialized. */
1585         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1586         if (rc) {
1587                 /* XXX ignore error for disconnected ost ? */
1588                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1589                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1590                         rc = 0;
1591                 GOTO(out, rc);
1592         }
1593
1594         spin_lock(&obd->obd_osfs_lock);
1595         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1596         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1597                 obd->obd_osfs_age = cfs_time_current_64();
1598         spin_unlock(&obd->obd_osfs_lock);
1599
1600         lov_update_statfs(osfs, lov_sfs, success);
1601         qos_update(lov);
1602 out:
1603         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1604             lovreq->rq_rqset->set_count == lovreq->rq_rqset->set_completes) {
1605                lov_statfs_interpret(NULL, lovreq->rq_rqset,
1606                                     lovreq->rq_rqset->set_success !=
1607                                                   lovreq->rq_rqset->set_count);
1608                qos_statfs_done(lov);
1609         }
1610
1611         RETURN(0);
1612 }
1613
1614 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1615                         struct lov_request_set **reqset)
1616 {
1617         struct lov_request_set *set;
1618         struct lov_obd *lov = &obd->u.lov;
1619         int rc = 0, i;
1620         ENTRY;
1621
1622         OBD_ALLOC(set, sizeof(*set));
1623         if (set == NULL)
1624                 RETURN(-ENOMEM);
1625         lov_init_set(set);
1626
1627         set->set_obd = obd;
1628         set->set_oi = oinfo;
1629
1630         /* We only get block data from the OBD */
1631         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1632                 struct lov_request *req;
1633
1634                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1635                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1636                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1637                         continue;
1638                 }
1639
1640                 OBD_ALLOC(req, sizeof(*req));
1641                 if (req == NULL)
1642                         GOTO(out_set, rc = -ENOMEM);
1643
1644                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1645                 if (req->rq_oi.oi_osfs == NULL) {
1646                         OBD_FREE(req, sizeof(*req));
1647                         GOTO(out_set, rc = -ENOMEM);
1648                 }
1649
1650                 req->rq_idx = i;
1651                 req->rq_oi.oi_cb_up = cb_statfs_update;
1652                 req->rq_oi.oi_flags = oinfo->oi_flags;
1653                 req->rq_rqset = set;
1654
1655                 lov_set_add_req(req, set);
1656         }
1657         if (!set->set_count)
1658                 GOTO(out_set, rc = -EIO);
1659         *reqset = set;
1660         RETURN(rc);
1661 out_set:
1662         lov_fini_statfs_set(set);
1663         RETURN(rc);
1664 }