Whamcloud - gitweb
Commented proc code
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index f1293aa..3731f7d 100644 (file)
  * by Cluster File Systems, Inc.
  */
 
-#define EXPORT_SYMTAB
 #define DEBUG_SUBSYSTEM S_LDLM
 
 #include <linux/lustre_dlm.h>
+#include <linux/obd.h>
 
-#define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
-
-static int is_local_conn(struct ptlrpc_connection *conn)
+int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
 {
         ENTRY;
-        if (conn == NULL)
-                RETURN(1);
 
-        RETURN(LOOPBACK(conn->c_peer.peer_nid));
+        if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
+                      LDLM_FL_BLOCK_CONV)) {
+                /* Go to sleep until the lock is granted. */
+                /* FIXME: or cancelled. */
+                LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
+                           " sleeping");
+                ldlm_lock_dump(lock);
+                ldlm_reprocess_all(lock->l_resource);
+                wait_event(lock->l_waitq, (lock->l_req_mode ==
+                                           lock->l_granted_mode));
+                LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
+        } else if (flags == LDLM_FL_WAIT_NOREPROC) {
+                wait_event(lock->l_waitq, (lock->l_req_mode ==
+                                           lock->l_granted_mode));
+        } else if (flags == 0) {
+                wake_up(&lock->l_waitq);
+        }
+
+        RETURN(0);
+}
+
+static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
+                                  struct lustre_handle *parent_lockh,
+                                  __u64 *res_id,
+                                  __u32 type,
+                                  void *cookie, int cookielen,
+                                  ldlm_mode_t mode,
+                                  int *flags,
+                                  ldlm_completion_callback completion,
+                                  ldlm_blocking_callback blocking,
+                                  void *data,
+                                  __u32 data_len,
+                                  struct lustre_handle *lockh)
+{
+        struct ldlm_lock *lock;
+        int err;
+
+        if (ns->ns_client) {
+                CERROR("Trying to cancel local lock\n");
+                LBUG();
+        }
+
+        lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, NULL, 0);
+        if (!lock)
+                GOTO(out_nolock, err = -ENOMEM);
+        LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
+
+        ldlm_lock_addref_internal(lock, mode);
+        ldlm_lock2handle(lock, lockh);
+        lock->l_connh = NULL;
+
+        err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
+                                blocking);
+        if (err != ELDLM_OK)
+                GOTO(out, err);
+
+        if (type == LDLM_EXTENT)
+                memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent));
+        if ((*flags) & LDLM_FL_LOCK_CHANGED)
+                memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id));
+
+        LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
+                          lock);
+
+        if (lock->l_completion_ast)
+                lock->l_completion_ast(lock, *flags);
+
+        LDLM_DEBUG(lock, "client-side local enqueue END");
+        EXIT;
+ out:
+        LDLM_LOCK_PUT(lock);
+ out_nolock:
+        return err;
 }
 
-int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
-                     __u32 ns_id,
-                     struct ldlm_handle *parent_lock_handle,
+int ldlm_cli_enqueue(struct lustre_handle *connh,
+                     struct ptlrpc_request *req,
+                     struct ldlm_namespace *ns,
+                     struct lustre_handle *parent_lock_handle,
                      __u64 *res_id,
                      __u32 type,
-                     struct ldlm_extent *req_ex,
+                     void *cookie, int cookielen,
                      ldlm_mode_t mode,
                      int *flags,
-                     ldlm_lock_callback completion,
-                     ldlm_lock_callback blocking,
+                     ldlm_completion_callback completion,
+                     ldlm_blocking_callback blocking,
                      void *data,
                      __u32 data_len,
-                     struct ldlm_handle *lockh,
-                     struct ptlrpc_request **request)
+                     struct lustre_handle *lockh)
 {
-        struct ldlm_handle local_lockh;
         struct ldlm_lock *lock;
         struct ldlm_request *body;
         struct ldlm_reply *reply;
-        struct ptlrpc_request *req = NULL;
-        char *bufs[2] = {NULL, data};
-        int rc, size[2] = {sizeof(*body), data_len};
-        ldlm_error_t err;
+        int rc, size = sizeof(*body), req_passed_in = 1;
         ENTRY;
 
-        err = ldlm_local_lock_create(ns_id, parent_lock_handle, res_id,
-                                     type, &local_lockh);
-        if (err != ELDLM_OK)
-                RETURN(err);
-
-        lock = ldlm_handle2object(&local_lockh);
-        /* Is this lock locally managed? */
-        if (is_local_conn(conn))
-                GOTO(local, 0);
-
-        req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
+        if (connh == NULL)
+                return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
+                                              type, cookie, cookielen, mode,
+                                              flags, completion, blocking, data,
+                                              data_len, lockh);
+
+        *flags = 0;
+        lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
+                                data, data_len);
+        if (lock == NULL)
+                GOTO(out_nolock, rc = -ENOMEM);
+        LDLM_DEBUG(lock, "client-side enqueue START");
+        /* for the local lock, add the reference */
+        ldlm_lock_addref_internal(lock, mode);
+        ldlm_lock2handle(lock, lockh);
+
+        if (req == NULL) {
+                req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1,
+                                      &size, NULL);
+                if (!req)
+                        GOTO(out, rc = -ENOMEM);
+                req_passed_in = 0;
+        } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
+                LBUG();
 
         /* Dump all of this data into the request buffer */
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        body->lock_desc.l_resource.lr_ns_id = ns_id;
-        body->lock_desc.l_resource.lr_type = type;
-        memcpy(body->lock_desc.l_resource.lr_name, res_id,
-               sizeof(body->lock_desc.l_resource.lr_name));
-
-        body->lock_desc.l_req_mode = mode;
-        if (req_ex)
-                memcpy(&body->lock_desc.l_extent, req_ex,
+        ldlm_lock2desc(lock, &body->lock_desc);
+        /* Phil: make this part of ldlm_lock2desc */
+        if (type == LDLM_EXTENT)
+                memcpy(&body->lock_desc.l_extent, cookie,
                        sizeof(body->lock_desc.l_extent));
-        body->flags = *flags;
-
-        memcpy(&body->lock_handle1, &local_lockh, sizeof(body->lock_handle1));
+        body->lock_flags = *flags;
 
+        memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
         if (parent_lock_handle)
                 memcpy(&body->lock_handle2, parent_lock_handle,
                        sizeof(body->lock_handle2));
 
         /* Continue as normal. */
-        size[0] = sizeof(*reply);
-        req->rq_replen = lustre_msg_size(1, size);
+        if (!req_passed_in) {
+                size = sizeof(*reply);
+                req->rq_replen = lustre_msg_size(1, &size);
+        }
+        lock->l_connh = connh;
+        lock->l_export = NULL;
 
         rc = ptlrpc_queue_wait(req);
+        /* FIXME: status check here? */
         rc = ptlrpc_check_status(req, rc);
+
         if (rc != ELDLM_OK) {
-                ldlm_resource_put(lock->l_resource);
-                ldlm_lock_free(lock);
+                LDLM_DEBUG(lock, "client-side enqueue END (%s)",
+                           rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
+                ldlm_lock_decref(lockh, mode);
+                /* FIXME: if we've already received a completion AST, this will
+                 * LBUG! */
+                ldlm_lock_destroy(lock);
                 GOTO(out, rc);
         }
 
-        lock->l_connection = conn;
         reply = lustre_msg_buf(req->rq_repmsg, 0);
         memcpy(&lock->l_remote_handle, &reply->lock_handle,
                sizeof(lock->l_remote_handle));
-        *flags = reply->flags;
+        if (type == LDLM_EXTENT)
+                memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
+        *flags = reply->lock_flags;
 
-        CERROR("remote handle: %p, flags: %d\n",
+        CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
                (void *)(unsigned long)reply->lock_handle.addr, *flags);
-        CERROR("extent: %Lu -> %Lu\n", reply->lock_extent.start,
-               reply->lock_extent.end);
-
-        EXIT;
- local:
-        rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags,
-                                     completion, blocking, data, data_len);
-        if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
-                     LDLM_FL_BLOCK_CONV)) {
-                /* Go to sleep until the lock is granted. */
-                /* FIXME: or cancelled. */
-                wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
-                                         lock->l_granted_mode);
+        CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
+               (unsigned long long)reply->lock_extent.start,
+               (unsigned long long)reply->lock_extent.end);
+
+        /* If enqueue returned a blocked lock but the completion handler has
+         * already run, then it fixed up the resource and we don't need to do it
+         * again. */
+        if ((*flags) & LDLM_FL_LOCK_CHANGED) {
+                int newmode = reply->lock_mode;
+                if (newmode && newmode != lock->l_req_mode) {
+                        LDLM_DEBUG(lock, "server returned different mode %s",
+                                   ldlm_lockname[newmode]);
+                        lock->l_req_mode = newmode;
+                }
+
+                if (reply->lock_resource_name[0] !=
+                    lock->l_resource->lr_name[0]) {
+                        CDEBUG(D_INFO, "remote intent success, locking %ld "
+                               "instead of %ld\n",
+                               (long)reply->lock_resource_name[0],
+                               (long)lock->l_resource->lr_name[0]);
+
+                        ldlm_lock_change_resource(lock,
+                                                  reply->lock_resource_name);
+                        if (lock->l_resource == NULL) {
+                                LBUG();
+                                RETURN(-ENOMEM);
+                        }
+                        LDLM_DEBUG(lock, "client-side enqueue, new resource");
+                }
         }
- out:
-        *request = req;
-        return rc;
-}
 
-int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl,
-                           struct ptlrpc_connection *conn, __u32 ns_id)
-{
-        struct ldlm_namespace *ns;
-        struct ldlm_request *body;
-        struct ptlrpc_request *req;
-        int rc, size = sizeof(*body);
-        ENTRY;
+        if (!req_passed_in)
+                ptlrpc_free_req(req);
 
-        if (is_local_conn(conn))
-                GOTO(local, 0);
-
-        req = ptlrpc_prep_req(cl, conn, LDLM_NAMESPACE_NEW, 1, &size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
-
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
-        body->lock_desc.l_resource.lr_ns_id = ns_id;
-
-        req->rq_replen = lustre_msg_size(0, NULL);
-
-        rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
-        ptlrpc_free_req(req);
-        if (rc)
-                GOTO(out, rc);
+        rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
+                               blocking);
+        if (lock->l_completion_ast)
+                lock->l_completion_ast(lock, *flags);
 
+        LDLM_DEBUG(lock, "client-side enqueue END");
         EXIT;
- local:
-        rc = ldlm_namespace_new(obddev, ns_id, &ns);
-        if (rc != ELDLM_OK) {
-                /* XXX: It succeeded remotely but failed locally. What to do? */
-                CERROR("Local ldlm_namespace_new failed.\n");
-        }
  out:
+        LDLM_LOCK_PUT(lock);
+ out_nolock:
         return rc;
 }
 
-int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                      void *data, __u32 data_len)
+int ldlm_match_or_enqueue(struct lustre_handle *connh,
+                          struct ptlrpc_request *req,
+                          struct ldlm_namespace *ns,
+                          struct lustre_handle *parent_lock_handle,
+                          __u64 *res_id,
+                          __u32 type,
+                          void *cookie, int cookielen,
+                          ldlm_mode_t mode,
+                          int *flags,
+                          ldlm_completion_callback completion,
+                          ldlm_blocking_callback blocking,
+                          void *data,
+                          __u32 data_len,
+                          struct lustre_handle *lockh)
 {
-        struct ldlm_request *body;
-        struct ptlrpc_request *req;
-        struct obd_device *obddev = lock->l_resource->lr_namespace->ns_obddev;
-        struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client;
-        int rc, size[2] = {sizeof(*body), data_len};
-        char *bufs[2] = {NULL, data};
+        int rc;
         ENTRY;
+        rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
+        if (rc == 0) {
+                rc = ldlm_cli_enqueue(connh, req, ns,
+                                      parent_lock_handle, res_id, type, cookie,
+                                      cookielen, mode, flags, completion,
+                                      blocking, data, data_len, lockh);
+                if (rc != ELDLM_OK)
+                        CERROR("ldlm_cli_enqueue: err: %d\n", rc);
+                RETURN(rc);
+        } else
+                RETURN(0);
+}
 
-        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 2, size,
-                              bufs);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
-
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
-        memcpy(&body->lock_handle1, &lock->l_remote_handle,
-               sizeof(body->lock_handle1));
+static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
+                                  int *flags)
+{
 
-        if (new != NULL) {
-                ldlm_lock2desc(new, &body->lock_desc);
-                ldlm_object2handle(new, &body->lock_handle2);
+        if (lock->l_resource->lr_namespace->ns_client) {
+                CERROR("Trying to cancel local lock\n");
+                LBUG();
         }
+        LDLM_DEBUG(lock, "client-side local convert");
 
-        req->rq_replen = lustre_msg_size(0, NULL);
+        ldlm_lock_convert(lock, new_mode, flags);
+        ldlm_reprocess_all(lock->l_resource);
 
-        rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
-        ptlrpc_free_req(req);
-
-        EXIT;
- out:
-        return rc;
+        LDLM_DEBUG(lock, "client-side local convert handler END");
+        LDLM_LOCK_PUT(lock);
+        RETURN(0);
 }
 
-int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
-                     int new_mode, int *flags, struct ptlrpc_request **request)
+int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
 {
         struct ldlm_request *body;
+        struct lustre_handle *connh;
         struct ldlm_reply *reply;
         struct ldlm_lock *lock;
+        struct ldlm_resource *res;
         struct ptlrpc_request *req;
-        int rc, size[2] = {sizeof(*body), lock->l_data_len};
-        char *bufs[2] = {NULL, lock->l_data};
+        int rc, size = sizeof(*body);
         ENTRY;
 
-        lock = ldlm_handle2object(lockh);
+        lock = ldlm_handle2lock(lockh);
+        if (!lock) {
+                LBUG();
+                RETURN(-EINVAL);
+        }
+        *flags = 0;
+        connh = lock->l_connh;
 
-        if (is_local_conn(lock->l_connection))
-                GOTO(local, 0);
+        if (!connh)
+                return ldlm_cli_convert_local(lock, new_mode, flags);
 
-        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 2, size,
-                              bufs);
+        LDLM_DEBUG(lock, "client-side convert");
+
+        req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_CONVERT, 1, &size,
+                              NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -222,9 +314,10 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
                sizeof(body->lock_handle1));
 
         body->lock_desc.l_req_mode = new_mode;
-        body->flags = *flags;
+        body->lock_flags = *flags;
 
-        req->rq_replen = lustre_msg_size(1, size);
+        size = sizeof(*reply);
+        req->rq_replen = lustre_msg_size(1, &size);
 
         rc = ptlrpc_queue_wait(req);
         rc = ptlrpc_check_status(req, rc);
@@ -232,51 +325,130 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
                 GOTO(out, rc);
 
         reply = lustre_msg_buf(req->rq_repmsg, 0);
-        *flags = reply->flags;
-
+        res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
+        if (res != NULL)
+                ldlm_reprocess_all(res);
+        /* Go to sleep until the lock is granted. */
+        /* FIXME: or cancelled. */
+        if (lock->l_completion_ast)
+                lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
         EXIT;
- local:
-        rc = ldlm_local_lock_convert(lockh, new_mode, flags);
  out:
-        *request = req;
+        LDLM_LOCK_PUT(lock);
+        ptlrpc_free_req(req);
         return rc;
 }
 
-int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
-                    struct ptlrpc_request **request)
+int ldlm_cli_cancel(struct lustre_handle *lockh)
 {
-        struct ldlm_request *body;
-        struct ldlm_lock *lock;
         struct ptlrpc_request *req;
-        int rc, size[2] = {sizeof(*body), lock->l_data_len};
-        char *bufs[2] = {NULL, lock->l_data};
+        struct ldlm_lock *lock;
+        struct ldlm_request *body;
+        int rc = 0, size = sizeof(*body);
+        ENTRY;
+
+        lock = ldlm_handle2lock(lockh);
+        if (!lock) {
+                /* It's possible that the decref that we did just before this
+                 * cancel was the last reader/writer, and caused a cancel before
+                 * we could call this function.  If we want to make this
+                 * impossible (by adding a dec_and_cancel() or similar), then
+                 * we can put the LBUG back. */
+                //LBUG();
+                RETURN(-EINVAL);
+        }
+
+        if (lock->l_connh) {
+                LDLM_DEBUG(lock, "client-side cancel");
+                /* Set this flag to prevent others from getting new references*/
+                l_lock(&lock->l_resource->lr_namespace->ns_lock);
+                lock->l_flags |= LDLM_FL_CBPENDING;
+                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
+                req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh), 
+                                      LDLM_CANCEL, 1, &size, NULL);
+                if (!req)
+                        GOTO(out, rc = -ENOMEM);
+
+                body = lustre_msg_buf(req->rq_reqmsg, 0);
+                memcpy(&body->lock_handle1, &lock->l_remote_handle,
+                       sizeof(body->lock_handle1));
+
+                req->rq_replen = lustre_msg_size(0, NULL);
+
+                rc = ptlrpc_queue_wait(req);
+                rc = ptlrpc_check_status(req, rc);
+                ptlrpc_free_req(req);
+                if (rc != ELDLM_OK)
+                        GOTO(out, rc);
+
+                ldlm_lock_cancel(lock);
+        } else {
+                LDLM_DEBUG(lock, "client-side local cancel");
+                if (lock->l_resource->lr_namespace->ns_client) {
+                        CERROR("Trying to cancel local lock\n");
+                        LBUG();
+                }
+                ldlm_lock_cancel(lock);
+                ldlm_reprocess_all(lock->l_resource);
+                LDLM_DEBUG(lock, "client-side local cancel handler END");
+        }
+
+        EXIT;
+ out:
+        LDLM_LOCK_PUT(lock);
+        return rc;
+}
+
+/* Cancel all locks on a given resource that have 0 readers/writers */
+int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id)
+{
+        struct ldlm_resource *res;
+        struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
+        struct ldlm_ast_work *w;
         ENTRY;
 
-        lock = ldlm_handle2object(lockh);
+        res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
+        if (res == NULL)
+                RETURN(-EINVAL);
 
-        if (is_local_conn(lock->l_connection))
-                GOTO(local, 0);
+        l_lock(&ns->ns_lock);
+        list_for_each(tmp, &res->lr_granted) {
+                struct ldlm_lock *lock;
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
-        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 2, size,
-                              bufs);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
+                if (lock->l_readers || lock->l_writers)
+                        continue;
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
-        memcpy(&body->lock_handle1, &lock->l_remote_handle,
-               sizeof(body->lock_handle1));
+                /* Setting the CBPENDING flag is a little misleading, but
+                 * prevents an important race; namely, once CBPENDING is set,
+                 * the lock can accumulate no more readers/writers.  Since
+                 * readers and writers are already zero here, ldlm_lock_decref
+                 * won't see this flag and call l_blocking_ast */
+                lock->l_flags |= LDLM_FL_CBPENDING;
 
-        req->rq_replen = lustre_msg_size(0, NULL);
+                OBD_ALLOC(w, sizeof(*w));
+                LASSERT(w);
 
-        rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
-        if (rc != ELDLM_OK)
-                GOTO(out, rc);
+                w->w_lock = LDLM_LOCK_GET(lock);
+                list_add(&w->w_list, &list);
+        }
+        l_unlock(&ns->ns_lock);
 
-        EXIT;
- local:
-        rc = ldlm_local_lock_cancel(lockh);
- out:
-        *request = req;
-        return rc;
+        list_for_each_safe(tmp, next, &list) {
+                struct lustre_handle lockh;
+                int rc;
+                w = list_entry(tmp, struct ldlm_ast_work, w_list);
+
+                ldlm_lock2handle(w->w_lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc != ELDLM_OK)
+                        CERROR("ldlm_cli_cancel: %d\n", rc);
+
+                LDLM_LOCK_PUT(w->w_lock);
+                list_del(&w->w_list);
+                OBD_FREE(w, sizeof(*w));
+        }
+
+        RETURN(0);
 }