Whamcloud - gitweb
b=19380
[fs/lustre-release.git] / lustre / osc / osc_create.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osc/osc_create.c
37  * For testing and management it is treated as an obd_device,
38  * although * it does not export a full OBD method table (the
39  * requests are coming * in over the wire, so object target modules
40  * do not have a full * method table.)
41  *
42  * Author: Peter Braam <braam@clusterfs.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48 #define DEBUG_SUBSYSTEM S_OSC
49
50 #ifdef __KERNEL__
51 # include <libcfs/libcfs.h>
52 #else /* __KERNEL__ */
53 # include <liblustre.h>
54 #endif
55
56 #ifdef  __CYGWIN__
57 # include <ctype.h>
58 #endif
59
60 # include <lustre_dlm.h>
61 #include <obd_class.h>
62 #include <obd.h>
63 #include "osc_internal.h"
64
65 /* XXX need AT adjust ? */
66 #define osc_create_timeout      (obd_timeout / 2)
67
68 struct osc_create_async_args {
69         struct osc_creator      *rq_oscc;
70         struct lov_stripe_md    *rq_lsm;
71         struct obd_info         *rq_oinfo;
72 };
73
74 static int oscc_internal_create(struct osc_creator *oscc);
75 static int handle_async_create(struct ptlrpc_request *req, int rc);
76
77 static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
78 {
79         struct osc_creator *oscc;
80         struct ost_body *body = NULL;
81         struct ptlrpc_request *fake_req, *pos;
82         ENTRY;
83
84         if (req->rq_repmsg) {
85                 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
86                                           lustre_swab_ost_body);
87                 if (body == NULL && rc == 0)
88                         rc = -EPROTO;
89         }
90
91         oscc = req->rq_async_args.pointer_arg[0];
92         LASSERT(oscc && (oscc->oscc_obd != LP_POISON));
93
94         spin_lock(&oscc->oscc_lock);
95         oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
96         switch (rc) {
97         case 0: {
98                 if (body) {
99                         int diff = body->oa.o_id - oscc->oscc_last_id;
100
101                         /* oscc_internal_create() stores the original value of
102                          * grow_count in rq_async_args.space[0].
103                          * We can't compare against oscc_grow_count directly,
104                          * because it may have been increased while the RPC
105                          * is in flight, so we would always find ourselves
106                          * having created fewer objects and decreasing the
107                          * precreate request size.  b=18577 */
108                         if (diff < (int) req->rq_async_args.space[0]) {
109                                 /* the OST has not managed to create all the
110                                  * objects we asked for */
111                                 oscc->oscc_grow_count = max(diff,
112                                                             OST_MIN_PRECREATE);
113                                 /* don't bump grow_count next time */
114                                 oscc->oscc_flags |= OSCC_FLAG_LOW;
115                         } else {
116                                 /* the OST is able to keep up with the work,
117                                  * we could consider increasing grow_count
118                                  * next time if needed */
119                                 oscc->oscc_flags &= ~OSCC_FLAG_LOW;
120                         }
121                         oscc->oscc_last_id = body->oa.o_id;
122                 }
123                 spin_unlock(&oscc->oscc_lock);
124                 break;
125         }
126         case -ENOSPC:
127         case -EROFS:
128         case -EFBIG: {
129                 oscc->oscc_flags |= OSCC_FLAG_NOSPC;
130                 if (body && rc == -ENOSPC) {
131                         oscc->oscc_last_id = body->oa.o_id;
132                         oscc->oscc_grow_count = OST_MIN_PRECREATE;
133                 }
134                 spin_unlock(&oscc->oscc_lock);
135                 DEBUG_REQ(D_INODE, req, "OST out of space, flagging");
136                 break;
137         }
138         case -EIO: {
139                 /* filter always set body->oa.o_id as the last_id
140                  * of filter (see filter_handle_precreate for detail)*/
141                 if (body && body->oa.o_id > oscc->oscc_last_id)
142                         oscc->oscc_last_id = body->oa.o_id;
143                 spin_unlock(&oscc->oscc_lock);
144                 break;
145         }
146         case -EWOULDBLOCK: {
147                 /* aka EAGAIN we should not delay create if import failed -
148                  * this avoid client stick in create and avoid race with delorphan */
149                 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
150                 /* oscc->oscc_grow_count = OST_MIN_PRECREATE; */
151                 spin_unlock(&oscc->oscc_lock);
152                 break;
153         }
154         default: {
155                 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
156                 oscc->oscc_grow_count = OST_MIN_PRECREATE;
157                 spin_unlock(&oscc->oscc_lock);
158                 DEBUG_REQ(D_ERROR, req,
159                           "unknown rc %d from async create: failing oscc", rc);
160                 ptlrpc_fail_import(req->rq_import,
161                                    lustre_msg_get_conn_cnt(req->rq_reqmsg));
162         }
163         }
164
165         CDEBUG(D_RPCTRACE, "prealloc through id "LPU64", next to use "LPU64"\n",
166                oscc->oscc_last_id, oscc->oscc_next_id);
167
168         spin_lock(&oscc->oscc_lock);
169         list_for_each_entry_safe(fake_req, pos,
170                                  &oscc->oscc_wait_create_list, rq_list) {
171                 if (handle_async_create(fake_req, rc)  == -EAGAIN) {
172                         oscc_internal_create(oscc);
173                         /* sending request should be never fail because
174                          * osc use preallocated requests pool */
175                         GOTO(exit_wakeup, rc);
176                 }
177         }
178         spin_unlock(&oscc->oscc_lock);
179
180 exit_wakeup:
181         cfs_waitq_signal(&oscc->oscc_waitq);
182         RETURN(rc);
183 }
184
185 static int oscc_internal_create(struct osc_creator *oscc)
186 {
187         struct ptlrpc_request *request;
188         struct ost_body *body;
189         __u32 size[] = { sizeof(struct ptlrpc_body), sizeof(*body) };
190         ENTRY;
191
192         LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
193
194         if(oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
195                 spin_unlock(&oscc->oscc_lock);
196                 RETURN(0);
197         }
198
199         /* we need check it before OSCC_FLAG_CREATING - because need
200          * see lower number of precreate objects */
201         if (oscc->oscc_grow_count < oscc->oscc_max_grow_count &&
202             ((oscc->oscc_flags & OSCC_FLAG_LOW) == 0) &&
203             (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <=
204                    (oscc->oscc_grow_count / 4 + 1)) {
205                 oscc->oscc_flags |= OSCC_FLAG_LOW;
206                 oscc->oscc_grow_count *= 2;
207         }
208
209         if (oscc->oscc_flags & OSCC_FLAG_CREATING) {
210                 spin_unlock(&oscc->oscc_lock);
211                 RETURN(0);
212         }
213
214         if (oscc->oscc_grow_count > oscc->oscc_max_grow_count / 2)
215                 oscc->oscc_grow_count = oscc->oscc_max_grow_count / 2;
216
217         oscc->oscc_flags |= OSCC_FLAG_CREATING;
218         spin_unlock(&oscc->oscc_lock);
219
220         request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import,
221                                   LUSTRE_OST_VERSION, OST_CREATE, 2,
222                                   size, NULL);
223         if (request == NULL) {
224                 spin_lock(&oscc->oscc_lock);
225                 oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
226                 spin_unlock(&oscc->oscc_lock);
227                 RETURN(-ENOMEM);
228         }
229
230         request->rq_request_portal = OST_CREATE_PORTAL;
231         ptlrpc_at_set_req_timeout(request);
232         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
233
234         spin_lock(&oscc->oscc_lock);
235         body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
236         body->oa.o_gr = 0;
237         body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
238         request->rq_async_args.space[0] = oscc->oscc_grow_count;
239         spin_unlock(&oscc->oscc_lock);
240         CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n",
241                body->oa.o_id, oscc->oscc_last_id);
242
243         /* we should not resend create request - anyway we will have delorphan
244          * and kill these objects */
245         request->rq_no_delay = request->rq_no_resend = 1;
246         ptlrpc_req_set_repsize(request, 2, size);
247
248         request->rq_async_args.pointer_arg[0] = oscc;
249         request->rq_interpret_reply = osc_interpret_create;
250         ptlrpcd_add_req(request);
251
252         RETURN(0);
253 }
254
255 static int oscc_has_objects_nolock(struct osc_creator *oscc, int count)
256 {
257         return ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
258 }
259
260
261 static int oscc_has_objects(struct osc_creator *oscc, int count)
262 {
263         int have_objs;
264
265         spin_lock(&oscc->oscc_lock);
266         have_objs = oscc_has_objects_nolock(oscc, count);
267         spin_unlock(&oscc->oscc_lock);
268
269         return have_objs;
270 }
271
272 static int oscc_wait_for_objects(struct osc_creator *oscc, int count)
273 {
274         int have_objs;
275         int ost_full;
276         int osc_invalid;
277
278         osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
279
280         spin_lock(&oscc->oscc_lock);
281         ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC);
282         have_objs = oscc_has_objects_nolock(oscc, count);
283         osc_invalid |= oscc->oscc_flags & OSCC_FLAG_EXITING;
284
285         if (!ost_full || !osc_invalid)
286                 /* they release lock himself */
287                 oscc_internal_create(oscc);
288         else
289                 spin_unlock(&oscc->oscc_lock);
290
291         return have_objs || ost_full || osc_invalid;
292 }
293
294 static int oscc_precreate(struct osc_creator *oscc)
295 {
296         struct l_wait_info lwi;
297         int rc = 0;
298         ENTRY;
299
300         if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
301                 RETURN(0);
302
303         /* we should be not block forever - because client's create rpc can
304          * stick in mds for long time and forbid client reconnect */
305         lwi = LWI_TIMEOUT(cfs_timeout_cap(cfs_time_seconds(osc_create_timeout)),
306                           NULL, NULL);
307
308         rc = l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);
309
310         if (!oscc_has_objects(oscc, 1) || (oscc->oscc_flags & OSCC_FLAG_NOSPC))
311                 rc = -ENOSPC;
312
313         if (oscc->oscc_obd->u.cli.cl_import->imp_invalid)
314                 rc = -EIO;
315
316         RETURN(rc);
317 }
318
319 static int oscc_recovering(struct osc_creator *oscc)
320 {
321         int recov;
322
323         spin_lock(&oscc->oscc_lock);
324         recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
325         spin_unlock(&oscc->oscc_lock);
326
327         return recov;
328 }
329
330 static int oscc_in_sync(struct osc_creator *oscc)
331 {
332         int sync;
333
334         spin_lock(&oscc->oscc_lock);
335         sync = oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS;
336         spin_unlock(&oscc->oscc_lock);
337
338         return sync;
339 }
340
341 /* decide if the OST has remaining object, return value :
342         0 : the OST has remaining object, and don't need to do precreate.
343         1 : the OST has no remaining object, and will send a RPC for precreate.
344         2 : the OST has no remaining object, and will not get any for
345             a potentially very long time
346      1000 : unusable
347  */
348 int osc_precreate(struct obd_export *exp)
349 {
350         struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
351         struct obd_import *imp = exp->exp_imp_reverse;
352         ENTRY;
353
354         LASSERT(oscc != NULL);
355         if (imp != NULL && imp->imp_deactive)
356                 RETURN(1000);
357
358         /* until oscc in recovery - other flags is wrong */
359         if (oscc_recovering(oscc))
360                 RETURN(2);
361
362         if (oscc->oscc_flags & OSCC_FLAG_NOSPC)
363                 RETURN(1000);
364
365         if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
366                 RETURN(0);
367
368         spin_lock(&oscc->oscc_lock);
369         if ((oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) ||
370             (oscc->oscc_flags & OSCC_FLAG_CREATING)) {
371                 spin_unlock(&oscc->oscc_lock);
372                 RETURN(1);
373         }
374
375         oscc_internal_create(oscc);
376         RETURN(1);
377 }
378
379 static int handle_async_create(struct ptlrpc_request *req, int rc)
380 {
381         struct osc_create_async_args *args = ptlrpc_req_async_args(req);
382         struct osc_creator    *oscc = args->rq_oscc;
383         struct lov_stripe_md  *lsm  = args->rq_lsm;
384         struct obd_info       *oinfo = args->rq_oinfo;
385         struct obdo           *oa = oinfo->oi_oa;
386
387         LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
388
389         if(rc)
390                 GOTO(out_wake, rc);
391
392         if ((oscc->oscc_flags & OSCC_FLAG_EXITING))
393                 GOTO(out_wake, rc = -EIO);
394
395         if (oscc_has_objects_nolock(oscc, 1)) {
396                 memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
397                 oa->o_id = oscc->oscc_next_id;
398                 lsm->lsm_object_id = oscc->oscc_next_id;
399                 oscc->oscc_next_id++;
400
401                 CDEBUG(D_RPCTRACE, " set oscc_next_id = "LPU64"\n",
402                        oscc->oscc_next_id);
403                GOTO(out_wake, rc = 0);
404         }
405
406         /* should be try wait until recovery finished */
407         if(oscc->oscc_flags & OSCC_FLAG_RECOVERING)
408                 RETURN(-EAGAIN);
409
410         if (oscc->oscc_flags & OSCC_FLAG_NOSPC)
411                 GOTO(out_wake, rc = -ENOSPC);
412
413         /* we not have objects now - continue wait */
414         RETURN(-EAGAIN);
415
416 out_wake:
417
418         rc = oinfo->oi_cb_up(oinfo, rc);
419         ptlrpc_fakereq_finished(req);
420
421         RETURN(rc);
422 }
423
424 static int async_create_interpret(struct ptlrpc_request *req, void *data, int rc)
425 {
426         struct osc_create_async_args *args = ptlrpc_req_async_args(req);
427         struct osc_creator    *oscc = args->rq_oscc;
428         int ret;
429
430         spin_lock(&oscc->oscc_lock);
431         ret = handle_async_create(req, rc);
432         spin_unlock(&oscc->oscc_lock);
433
434         return ret;
435 }
436
437 int osc_create_async(struct obd_export *exp, struct obd_info *oinfo,
438                      struct lov_stripe_md **ea, struct obd_trans_info *oti)
439 {
440         int rc;
441         struct ptlrpc_request *fake_req;
442         struct osc_create_async_args *args;
443         struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
444         struct obdo *oa = oinfo->oi_oa;
445         ENTRY;
446
447         if ((oa->o_valid & OBD_MD_FLGROUP) && (oa->o_gr != 0)){
448                 rc = osc_real_create(exp, oinfo->oi_oa, ea, oti);
449                 rc = oinfo->oi_cb_up(oinfo, rc);
450                 RETURN(rc);
451         }
452
453         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454             oa->o_flags == OBD_FL_RECREATE_OBJS) {
455                 rc = osc_real_create(exp, oinfo->oi_oa, ea, oti);
456                 rc = oinfo->oi_cb_up(oinfo, rc);
457                 RETURN(rc);
458         }
459
460         LASSERT((*ea) != NULL);
461
462         fake_req = ptlrpc_prep_fakereq(osc_create_timeout, async_create_interpret);
463         if (fake_req == NULL) {
464                 rc = oinfo->oi_cb_up(oinfo, -ENOMEM);
465                 RETURN(-ENOMEM);
466         }
467
468         args = ptlrpc_req_async_args(fake_req);
469         CLASSERT(sizeof(*args) <= sizeof(fake_req->rq_async_args));
470
471         args->rq_oscc  = oscc;
472         args->rq_lsm   = *ea;
473         args->rq_oinfo = oinfo;
474
475         spin_lock(&oscc->oscc_lock);
476         /* try fast path */
477         rc = handle_async_create(fake_req, 0);
478         if (rc == -EAGAIN) {
479                 int is_add;
480                 /* we not have objects - try wait */
481                 is_add = ptlrpcd_add_req(fake_req);
482                 if (!is_add)
483                         list_add(&fake_req->rq_list,
484                                  &oscc->oscc_wait_create_list);
485                 else
486                         rc = is_add;
487         }
488         spin_unlock(&oscc->oscc_lock);
489
490         if (rc != -EAGAIN)
491                 /* need free request if was error hit or
492                  * objects already allocated */
493                 ptlrpc_req_finished(fake_req);
494         else
495                 /* EAGAIN mean - request is delayed */
496                 rc = 0;
497
498         RETURN(rc);
499 }
500
501
502 int osc_create(struct obd_export *exp, struct obdo *oa,
503                struct lov_stripe_md **ea, struct obd_trans_info *oti)
504 {
505         struct lov_stripe_md *lsm;
506         struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
507         int rc = 0;
508
509         ENTRY;
510         LASSERT(oa);
511         LASSERT(ea);
512
513         if ((oa->o_valid & OBD_MD_FLGROUP) && (oa->o_gr != 0))
514                 RETURN(osc_real_create(exp, oa, ea, oti));
515
516         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
517             oa->o_flags == OBD_FL_RECREATE_OBJS) {
518                 RETURN(osc_real_create(exp, oa, ea, oti));
519         }
520
521         /* this is the special case where create removes orphans */
522         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
523             oa->o_flags == OBD_FL_DELORPHAN) {
524                 spin_lock(&oscc->oscc_lock);
525                 if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) {
526                         spin_unlock(&oscc->oscc_lock);
527                         RETURN(-EBUSY);
528                 }
529                 if (!(oscc->oscc_flags & OSCC_FLAG_RECOVERING)) {
530                         spin_unlock(&oscc->oscc_lock);
531                         RETURN(0);
532                 }
533
534                 oscc->oscc_flags |= OSCC_FLAG_SYNC_IN_PROGRESS;
535                 /* seting flag LOW we prevent extra grow precreate size
536                  * and enforce use last assigned size */
537                 oscc->oscc_flags |= OSCC_FLAG_LOW;
538                 spin_unlock(&oscc->oscc_lock);
539                 CDEBUG(D_HA, "%s: oscc recovery started - delete to "LPU64"\n",
540                        oscc->oscc_obd->obd_name, oscc->oscc_next_id - 1);
541
542                 /* delete from next_id on up */
543                 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
544                 oa->o_id = oscc->oscc_next_id - 1;
545                 oa->o_gr = 0;
546
547                 rc = osc_real_create(exp, oa, ea, NULL);
548
549                 spin_lock(&oscc->oscc_lock);
550                 oscc->oscc_flags &= ~OSCC_FLAG_SYNC_IN_PROGRESS;
551                 if (rc == 0 || rc == -ENOSPC) {
552                         struct obd_connect_data *ocd;
553                         struct obd_import *imp = oscc->oscc_obd->u.cli.cl_import;
554
555                         if (rc == -ENOSPC)
556                                 oscc->oscc_flags |= OSCC_FLAG_NOSPC;
557                         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
558                         oscc->oscc_last_id = oa->o_id;
559
560                         ocd = &imp->imp_connect_data;
561                         if (ocd->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN) {
562                                 CDEBUG(D_HA, "%s: Skip orphan set, reset last "
563                                        "objid\n", oscc->oscc_obd->obd_name);
564                                 oscc->oscc_next_id = oa->o_id + 1;
565                         }
566
567                         CDEBUG(D_HA, "%s: oscc recovery finished, last_id: "
568                                LPU64", rc: %d\n", oscc->oscc_obd->obd_name,
569                                oscc->oscc_last_id, rc);
570                 } else {
571                         CDEBUG(D_ERROR, "%s: oscc recovery failed: %d\n",
572                                oscc->oscc_obd->obd_name, rc);
573                 }
574
575                 cfs_waitq_signal(&oscc->oscc_waitq);
576                 spin_unlock(&oscc->oscc_lock);
577
578                 if (rc < 0)
579                         RETURN(rc);
580         }
581
582         lsm = *ea;
583         if (lsm == NULL) {
584                 rc = obd_alloc_memmd(exp, &lsm);
585                 if (rc < 0)
586                         RETURN(rc);
587         }
588
589         while (1) {
590                 if (oscc_in_sync(oscc))
591                         CDEBUG(D_HA,"%s: oscc recovery in progress, waiting\n",
592                                oscc->oscc_obd->obd_name);
593
594                 rc = oscc_precreate(oscc);
595                 if (rc) {
596                         CDEBUG(D_HA,"%s: error create %d\n",
597                                oscc->oscc_obd->obd_name, rc);
598                         break;
599                 }
600
601                 spin_lock(&oscc->oscc_lock);
602                 if (oscc->oscc_flags & OSCC_FLAG_EXITING) {
603                         spin_unlock(&oscc->oscc_lock);
604                         break;
605                 }
606                 /* wakeup but recovery not finished */
607                 if (oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
608                         rc = -EIO;
609                         spin_unlock(&oscc->oscc_lock);
610                         break;
611                 }
612
613                 if (oscc_has_objects_nolock(oscc, 1)) {
614                         memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
615                         oa->o_id = oscc->oscc_next_id;
616                         lsm->lsm_object_id = oscc->oscc_next_id;
617                         *ea = lsm;
618                         oscc->oscc_next_id++;
619                         spin_unlock(&oscc->oscc_lock);
620
621                         CDEBUG(D_RPCTRACE, "%s: set oscc_next_id = "LPU64"\n",
622                                exp->exp_obd->obd_name, oscc->oscc_next_id);
623                         break;
624                 } else if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
625                         rc = -ENOSPC;
626                         spin_unlock(&oscc->oscc_lock);
627                         break;
628                 }
629
630                 spin_unlock(&oscc->oscc_lock);
631         }
632
633         if (rc == 0)
634                 CDEBUG(D_INFO, "%s: returning objid "LPU64"\n",
635                        obd2cli_tgt(oscc->oscc_obd), lsm->lsm_object_id);
636         else if (*ea == NULL)
637                 obd_free_memmd(exp, &lsm);
638         RETURN(rc);
639 }
640
641 void oscc_init(struct obd_device *obd)
642 {
643         struct osc_creator *oscc;
644
645         if (obd == NULL)
646                 return;
647
648         oscc = &obd->u.cli.cl_oscc;
649
650         memset(oscc, 0, sizeof(*oscc));
651
652         cfs_waitq_init(&oscc->oscc_waitq);
653         spin_lock_init(&oscc->oscc_lock);
654         oscc->oscc_obd = obd;
655         oscc->oscc_grow_count = OST_MIN_PRECREATE;
656         oscc->oscc_max_grow_count = OST_MAX_PRECREATE;
657
658         oscc->oscc_next_id = 2;
659         oscc->oscc_last_id = 1;
660         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
661
662         CFS_INIT_LIST_HEAD(&oscc->oscc_wait_create_list);
663
664         /* XXX the export handle should give the oscc the last object */
665         /* oed->oed_oscc.oscc_last_id = exph->....; */
666 }
667
668 void oscc_fini(struct obd_device *obd)
669 {
670         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
671         ENTRY;
672
673
674         spin_lock(&oscc->oscc_lock);
675         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
676         oscc->oscc_flags |= OSCC_FLAG_EXITING;
677         spin_unlock(&oscc->oscc_lock);
678 }