args: {
name: "",
type: "",
+ host: "",
data: {},
},
call: function(req) {
try {
- core.handle_request(null, req, req.args, true);
+ let host = req.args.host;
+ delete req.args.host;
+ core.handle_request(null, req, req.args, true, host);
} catch (e) {
core.exception(e);
}
this.channel.request("subscribe", { name });
}
-function send(name, type, data)
+function send_ext(data)
{
this.channel.request({
method: "message",
return: "ignore",
- data: {
- name, type, data
- },
+ data
+ });
+}
+
+function send_host(host, name, type, data)
+{
+ this.send_ext({
+ host, name, type, data
+ });
+}
+
+function send(name, type, data)
+{
+ this.send_ext({
+ name, type, data
});
}
{
}
-function request(name, type, data, data_cb, complete_cb)
+function request_ext(data, data_cb, complete_cb)
{
if (!this.channel)
this.connect();
let req = this.channel.defer({
method: "request",
- data: {
- name, type, data
- },
+ data,
data_cb,
cb: complete_cb
});
req.await();
}
+function request_host(host, name, type, data, data_cb, complete_cb)
+{
+ return this.request_ext({
+ host, name, type, data
+ }, data_cb, complete_cb);
+}
+
+function request(name, type, data, data_cb, complete_cb)
+{
+ return this.request_ext({
+ name, type, data
+ }, data_cb, complete_cb);
+}
+
function connect()
{
if (this.channel)
}
const client_proto = {
- connect, publish, subscribe, send, request,
+ connect, publish, subscribe,
+ send, send_ext, send_host,
+ request, request_ext, request_host,
close: function() {
for (let sub in this.sub_cb) {
if (!sub.timer)
if (type(name) != "string" || type(args.type) != "string" || type(args.data) != "object")
return libubus.STATUS_INVALID_ARGUMENT;
- let data = prepare_data(req.args);
+ let data = prepare_data(args);
let handle;
switch (req.type) {
case "message":
handle = cl.publish[name];
if (!handle)
return libubus.STATUS_INVALID_ARGUMENT;
- return core.handle_message(handle, data, true);
+ return core.handle_message(handle, data, true, args.host);
case "request":
handle = cl.subscribe[name];
if (!handle &&
return libubus.STATUS_PERMISSION_DENIED;
handle ??= { client: cl.id };
- return core.handle_request(handle, req, data, true);
+ return core.handle_request(handle, req, data, true, args.host);
}
}
remote.pubsub_set(kind, name, length(list) > 0);
}
-function get_handles(handle, local, remote)
+function get_handles(handle, local, remote, host)
{
let handles = [];
+ if (host == "")
+ remote = {};
+ else if (host != null)
+ local = {};
+
for (let cur_id, cur in local) {
if (handle) {
if (handle.id == cur_id)
if (!remote)
return handles;
- for (let cur_id, cur in remote)
+ for (let cur_id, cur in remote) {
+ if (host != null && cur.name != host)
+ continue;
push(handles, cur);
+ }
return handles;
}
-function handle_request(handle, req, data, remote)
+function handle_request(handle, req, data, remote, host)
{
let name = data.name;
let local = this.publish[name];
if (remote)
remote = this.remote_publish[name];
- let handles = get_handles(handle, local, remote);
+ let handles = get_handles(handle, local, remote, host);
let context = {
pending: length(handles),
}
}
-function handle_message(handle, data, remote)
+function handle_message(handle, data, remote, host)
{
let name = data.name;
let local = this.subscribe[name];
if (remote)
remote = this.remote_subscribe[name];
- let handles = get_handles(handle, local, remote);
+ let handles = get_handles(handle, local, remote, host);
for (let cur in handles) {
if (!cur || !cur.get_channel)
continue;