分布式任务调度开发指导

场景介绍

开发者在应用中集成分布式调度能力,通过调用指定能力的分布式接口,实现跨设备能力调度。根据Ability模板及意图的不同,分布式任务调度向开发者提供以下六种能力:启动远程FA、启动远程PA、关闭远程PA、连接远程PA、断开连接远程PA和FA跨设备迁移。下面以设备A(本地设备)和设备B(远端设备)为例,进行场景介绍:

  1. 设备A启动设备B的FA:在设备A上通过本地应用提供的启动按钮,启动设备B上对应的FA。例如:设备A控制设备B打开相册,只需开发者在启动FA时指定打开相册的意图即可。
  2. 设备A启动设备B的PA:在设备A上通过本地应用提供的启动按钮,启动设备B上指定的PA。例如:开发者在启动远程服务时通过意图指定音乐播放服务,即可实现设备A启动设备B音乐播放的能力。
  3. 设备A关闭设备B的PA:在设备A上通过本地应用提供的关闭按钮,关闭设备B上指定的PA。类似启动的过程,开发者在关闭远程服务时通过意图指定音乐播放服务,即可实现关闭设备B上该服务的能力。
  4. 设备A连接设备B的PA:在设备A上通过本地应用提供的连接按钮,连接设备B上指定的PA。连接后,通过其他功能相关按钮实现控制对端PA的能力。通过连接关系,开发者可以实现跨设备的同步服务调度,实现如大型计算任务互助等价值场景。
  5. 设备A与设备B的PA断开连接:在设备A上通过本地应用提供断开连接的按钮,将之前已连接的PA断开连接。
  6. 设备A的FA迁移至设备B:设备A上通过本地应用提供的迁移按钮,将设备A的业务无缝迁移到设备B中。通过业务迁移能力,打通设备A和设备B间的壁垒,实现如文档跨设备编辑、视频从客厅到房间跨设备接续播放等场景。

接口说明

分布式调度平台提供的连接和断开连接PA、启动远程FA、启动和关闭PA以及迁移FA的能力,是实现更多价值性场景的基础。

连接远程PA

connectAbility(Intent intent, IAbilityConnection conn)接口提供连接指定设备上PA的能力,Intent中指定待连接PA的设备deviceId、bundleName和abilityName。当连接成功后,通过在conn定义的onAbilityConnectDone回调中获取对端PA的服务代理,两者的连接关系则由conn维护。具体的参数定义如下表所示:

参数名 类型 说明
intent ohos.aafwk.content.Intent 开发者需在intent对应的Operation中指定待连接PA的设备deviceId、bundleName和abilityName。
conn ohos.aafwk.ability.IAbilityConnection 当连接成功或失败时,作为连接关系的回调接口。该接口提供连接完成和断开连接完成时的处理逻辑,开发者可根据具体的场景进行定义。

启动远程FA/PA

startAbility(Intent intent)接口提供启动指定设备上FA和PA的能力,Intent中指定待启动FA/PA的设备deviceId、bundleName和abilityName。具体参数定义如下表所示:

参数名 类型 说明
intent ohos.aafwk.content.Intent 当开发者需要调用该接口启动远程PA时,需要指定待启动PA的设备deviceId、bundleName和abilityName。若不指定设备deviceId,则无法跨设备调用PA。类似地,在启动FA时,也需要开发者指定启动FA的设备deviceId、bundleName和abilityName。

分布式调度平台还会提供与上述功能相对应的断开远程PA的连接和关闭远程PA的接口,相关的参数与连接、启动的接口类似。

  • 断开远程PA连接:disconnectAbility(IAbilityConnection conn)。
  • 关闭远程PA:stopAbility(Intent intent)。

迁移FA

continueAbility(String deviceId)接口提供将本地FA迁移到指定设备上的能力,需要开发者在调用时指定目标设备的deviceId。具体参数定义如下表所示:

说明

Ability和AbilitySlice类均需要实现IAbilityContinuation及其方法,才可以实现FA迁移。

参数名 类型 说明
deviceId String 当开发者需要调用该接口将本地FA迁移时,需要指定目标设备的deviceId。

开发步骤

  1. 导入功能依赖的包。

    // 以下依赖包含分布式调度平台开放的接口// 用于:连接/断开连接远程PA、启动远程FA、通过连接关系实现对PA的控制import ohos.aafwk.ability.AbilitySlice;import ohos.aafwk.ability.IAbilityConnection;import ohos.aafwk.content.Intent;import ohos.aafwk.content.Operation;import ohos.bundle.ElementName;// 为了实现迁移能力,需要引入传递迁移所需数据的包以及实现迁移能力的接口import ohos.aafwk.ability.IAbilityContinuation;import ohos.aafwk.content.IntentParams;// 为了实现跨设备指令及数据通信,需要使用RPC接口import ohos.rpc.IRemoteObject;import ohos.rpc.IRemoteBroker;import ohos.rpc.MessageParcel;import ohos.rpc.MessageOption;import ohos.rpc.RemoteException;import ohos.rpc.RemoteObject;//(可选)多设备场景下涉及设备选择,为此需要引入组网设备发现的能力import ohos.distributedschedule.interwork.DeviceInfo;import ohos.distributedschedule.interwork.DeviceManager;// (可选)设计界面相关的包函数,对FA界面及按钮进行绘制import ohos.agp.components.Button;import ohos.agp.components.Component;import ohos.agp.components.Component.ClickedListener;import ohos.agp.components.ComponentContainer.LayoutConfig;import ohos.agp.components.element.ShapeElement;import ohos.agp.components.PositionLayout;
    
  2. (可选)编写一个基本的FA用于使用分布式能力。

    // 调用AbilitySlice模板实现一个用于控制基础功能的FA,AbilitySlice的代码示例如下:public class SampleSlice extends AbilitySlice {    @Override    public void onStart(Intent intent) {        super.onStart(intent);        // 开发者可以自行进行界面设计        // 为按钮设置统一的背景色        // 例如通过PositionLayout可以实现简单界面        PositionLayout layout = new PositionLayout(this);        LayoutConfig config = new LayoutConfig(LayoutConfig.MATCH_PARENT, LayoutConfig.MATCH_PARENT);        layout.setLayoutConfig(config);        ShapeElement buttonBg = new ShapeElement();        buttonBg.setRgbColor(new RgbColor(0, 125, 255));        addComponents(layout, buttonBg, config);        super.setUIContent(layout);    }
        @Override    public void onInactive() {        super.onInactive();    }
        @Override    public void onActive() {        super.onActive();    }
        @Override    public void onBackground() {        super.onBackground();    }
        @Override    public void onForeground(Intent intent) {        super.onForeground(intent);    }
        @Override    public void onStop() {        super.onStop();    }}
    

    说明

    此步骤展示了一个简单FA的实现过程,实际开发中请开发者根据需要进行设计。

  3. 使用分布式能力要求开发者在Ability对应的config.json中声明多设备协同访问的权限:三方应用使用{"name": "ohos.permission.DISTRIBUTED_DATASYNC"}。

    一个三方应用部署的示例如下:

    {    "reqPermissions": [        {            "name": "ohos.permission.DISTRIBUTED_DATASYNC"        }    ]}
    

    此外,对于三方应用还要求在实现Ability的代码中显式声明需要使用的权限,如下所示:

    public class SampleSlice extends AbilitySlice {    @Override    public void onStart(Intent intent) {        // 开发者显示声明需要使用的权限        requestPermissionsFromUser(new String[]{"ohos.permission.DISTRIBUTED_DATASYNC"}, 0);        super.onStart(intent);            }}
    
  4. (可选)为不同的能力设置相应的控制按钮。

    // 建议开发者按照自己的界面进行按钮设计
    // 开发者可以自行实现如下createButton的方法,新建一个显示文字text,背景色为buttonBg以及大小尺寸位置符合config设置的按钮,用来与用户交互// private Button createButton(String text, ShapeElement buttonBg, LayoutConfig config)// 按照顺序在PositionLayout中依次添加按钮的示例private void addComponents(PositionLayout linear, ShapeElement buttonBg, LayoutConfig config) {    // 构建远程启动FA的按钮    btnStartRemoteFA = createButton("StartRemoteFA", buttonBg, config);    btnStartRemoteFA.setClickedListener(mStartRemoteFAListener);    linear.addComponent(btnStartRemoteFA);    // 构建远程启动PA的按钮    btnStartRemotePA = createButton("StartRemotePA", buttonBg, config);    btnStartRemotePA.setClickedListener(mStartRemotePAListener);    linear.addComponent(btnStartRemotePA);    // 构建远程关闭PA的按钮    btnStopRemotePA = createButton("StopRemotePA", buttonBg, config);    btnStopRemotePA.setClickedListener(mStopRemotePAListener);    linear.addComponent(btnStopRemotePA);    // 构建连接远程PA的按钮    btnConnectRemotePA = createButton("ConnectRemotePA", buttonBg, config);    btnConnectRemotePA.setClickedListener(mConnectRemotePAListener);    linear.addComponent(btnConnectRemotePA);    // 构建控制连接PA的按钮    btnControlRemotePA = createButton("ControlRemotePA", buttonBg, config);    btnControlRemotePA.setClickedListener(mControlPAListener);    linear.addComponent(btnControlRemotePA);    // 构建与远程PA断开连接的按钮    btnDisconnectRemotePA = createButton("DisconnectRemotePA", buttonBg, config);    btnDisconnectRemotePA.setClickedListener(mDisconnectRemotePAListener);    linear.addComponent(btnDisconnectRemotePA);    // 构建迁移FA的按钮    btnContinueRemoteFA = createButton("ContinueRemoteFA", buttonBg, config);    btnContinueRemoteFA.setClickedListener(mContinueAbilityListener);    linear.addComponent(btnContinueRemoteFA);}
    

    说明

    此处只展示了基于按钮控制的能力调度方法,实际开发中请开发者根据需要选择能力调度方式。代码示例中未体现按钮如位置、样式等具体的设置方法,详请参考JAVA UI框架

  5. 通过设备管理DeviceManager提供的getDeviceList接口获取设备列表,用于指定目标设备。

    // ISelectResult是一个自定义接口,用来处理指定设备deviceId后执行的行为 interface ISelectResult {     void onSelectResult(String deviceId); }
    // 获得设备列表,开发者可在得到的在线设备列表中选择目标设备执行操作private void scheduleRemoteAbility(ISelectResult listener) {    // 调用DeviceManager的getDeviceList接口,通过FLAG_GET_ONLINE_DEVICE标记获得在线设备列表    List<DeviceInfo> onlineDevices = DeviceManager.getDeviceList(DeviceInfo.FLAG_GET_ONLINE_DEVICE);    // 判断组网设备是否为空    if (onlineDevices.isEmpty()) {        listener.onSelectResult(null);        return;    }    int numDevices = onlineDevices.size();    List<String> deviceIds = new ArrayList<>(numDevices);    onlineDevices.forEach((device) -> {        deviceIds.add(device.getDeviceId());    });    // 以选择首个设备作为目标设备为例    // 开发者也可按照具体场景,通过别的方式进行设备选择    String selectDeviceId = deviceIds.get(0);    listener.onSelectResult(selectDeviceId);    }
    

    上述实例中涉及对在线组网设备的查询,该项能力需要开发者在对应的config.json中声明获取设备列表及设备信息的权限,如下所示:

    {    "reqPermissions": [        {            "name": "ohos.permission.DISTRIBUTED_DEVICE_STATE_CHANGE"        },         {            "name": "ohos.permission.GET_DISTRIBUTED_DEVICE_INFO"        },         {            "name": "ohos.permission.GET_BUNDLE_INFO"        }    ]}
    
  6. 为启动远程FA的按钮设置点击回调,实现启动远程FA的能力。

    // 启动一个指定bundleName和abilityName的FAprivate ClickedListener mStartRemoteFAListener = new ClickedListener() {    @Override    public void onClick(Component arg0) {        // 启动远程PA        scheduleRemoteAbility(new ISelectResult() {            @Override            void onSelectResult(String deviceId) {                if (deviceId != null) {                    // 通过scheduleRemoteAbility指定目标设备deviceId                    // 指定待启动FA的bundleName和abilityName                    // 例如:bundleName = "com.helloworld"                    //       abilityName = "com.helloworld.SampleFeatureAbility"                    // 设置分布式标记,表明当前涉及分布式能力                    Operation operation = new Intent.OperationBuilder()                            .withDeviceId(deviceId)                            .withBundleName(bundleName)                            .withAbilityName(abilityName)                            .withFlags(Intent.FLAG_ABILITYSLICE_MULTI_DEVICE)                            .build();                    Intent intent = new Intent();                    intent.setOperation(operation);                    // 通过AbilitySlice包含的startAbility接口实现跨设备启动FA                    startAbility(intent);                }            }        });    }};
    
  7. 为启动和关闭PA定义回调,实现启动和关闭PA的能力。

    对于PA的启动、关闭、连接等操作都需要开发者提供目标设备的deviceId。开发者可以通过DeviceManager相关接口得到当前组网下的设备列表,并以弹窗的形式供用户选择,也可以按照实际需要实现其他个性化的处理方式。在点击事件回调函数中,需要开发者指定得到deviceId后的处理逻辑,即实现类似上例中listener.onSelectResult(String deviceId)的方法,代码示例如下:

    // 启动远程PAprivate ClickedListener mStartRemotePAListener = new ClickedListener() {    @Override    public void onClick(Component arg0) {        // 启动远程PA        scheduleRemoteAbility(new ISelectResult() {            @Override            void onSelectResult(String deviceId) {                if (deviceId != null) {                    // bundleName和abilityName与待启动PA对应                    // 例如:bundleName = "com.helloworld"                    //       abilityName = "com.helloworld.SampleParticleAbility"                    Operation operation = new Intent.OperationBuilder()                            .withDeviceId(deviceId)                            .withBundleName(bundleName)                            .withAbilityName(abilityName)                            .withFlags(Intent.FLAG_ABILITYSLICE_MULTI_DEVICE)                            .build();                    Intent intentToStartPA = new Intent();                    intentToStartPA.setOperation(operation);                    startAbility(intentToStartPA);                }            }        });    }};
    // 关闭远程PA,和启动类似开发者需要指定待关闭PA对应的bundleName和abilityNameprivate ClickedListener mStopRemotePAListener = new ClickedListener() {    @Override    public void onClick(Component arg0) {        scheduleRemoteAbility(new ISelectResult() {            @Override            void onSelectResult(String deviceId) {                if (deviceId != null) {                    // bundleName和abilityName与待关闭PA对应                    // 例如:bundleName = "com.helloworld"                    //       abilityName = "com.helloworld.SampleParticleAbility"                    Operation operation = new Intent.OperationBuilder()                            .withDeviceId(deviceId)                            .withBundleName(bundleName)                            .withAbilityName(abilityName)                            .withFlags(Intent.FLAG_ABILITYSLICE_MULTI_DEVICE)                            .build();                    Intent intentToStopPA = new Intent();                    intentToStopPA.setOperation(operation);                    stopAbility(intentToStopPA);                }            }        });    }};
    

    说明

    启动和关闭的行为类似,开发者只需在Intent中指定待调度PA的deviceId、bundleName和abilityName,并以operation的形式封装到Intent内。通过AbilitySlice(Ability)包含的startAbility()和stopAbility()接口即可实现相应功能。

  8. 设备A连接设备B侧的PA,利用连接关系调用该PA执行特定任务,以及断开连接。

    // 当连接完成时,用来提供管理已连接PA的能力private MyRemoteProxy mProxy = null;// 用于管理连接关系private IAbilityConnection mConn = new IAbilityConnection() {    @Override    public void onAbilityConnectDone(ElementName element, IRemoteObject remote, int resultCode) {        // 跨设备PA连接完成后,会返回一个序列化的IRemoteObject对象        // 通过该对象得到控制远端服务的代理        mProxy = new MyRemoteProxy(remote);        btnConnectRemotePA.setText("connectRemoteAbility done");    }
        @Override    public void onAbilityDisconnectDone(ElementName element, int resultCode) {        // 当已连接的远端PA关闭时,会触发该回调        // 支持开发者按照返回的错误信息进行PA生命周期管理        disconnectAbility(mConn);    }};
    

    仅通过启动/关闭两种方式对PA进行调度无法应对需长期交互的场景,因此,分布式任务调度平台向开发者提供了跨设备PA连接及断开连接的能力。为了对已连接PA进行管理,开发者需要实现一个满足IAbilityConnection接口的连接状态检测实例,通过该实例可以对连接及断开连接完成时设置具体的处理逻辑,例如:获取控制对端PA的代理等。进一步为了使用该代理跨设备调度PA,开发者需要在本地及对端分别实现对外接口一致的代理。一个具备加法能力的代理示例如下:

    // 以连接提供加法计算能力的PA为例。为了提供跨设备连接能力,需要在本地发起连接侧和对端被连接侧分别实现代理// 发起连接侧的代理示例如下:public class MyRemoteProxy implements IRemoteBroker {    private static final int ERR_OK = 0;    private static final int COMMAND_PLUS = IRemoteObject.MIN_TRANSACTION_ID;    private final IRemoteObject remote;
        public MyRemoteProxy(IRemoteObject remote) {        this.remote = remote;    }
        @Override    public IRemoteObject asObject() {        return remote;    }
        public int plus(int a, int b) throws RemoteException {        MessageParcel data = MessageParcel.obtain();        MessageParcel reply = MessageParcel.obtain();        // option不同的取值,决定采用同步或异步方式跨设备控制PA        // 本例需要同步获取对端PA执行加法的结果,因此采用同步的方式,即MessageOption.TF_SYNC        // 具体MessageOption的设置,可参考相关API文档        MessageOption option = new MessageOption(MessageOption.TF_SYNC);        data.writeInt(a);        data.writeInt(b);
            try {            remote.sendRequest(COMMAND_PLUS, data, reply, option);            int errCode = reply.readInt();            if (errCode != ERR_OK) {                throw new RemoteException();            }            int result = reply.readInt();            return result;        }finally {            data.reclaim();            reply.reclaim();        }    }}
    

    此外,对端待连接的PA需要实现对应的客户端,代码示例如下所示:

    // 以计算加法为例,对端实现的客户端如下public class MyRemote extends RemoteObject implements IRemoteBroker{    private static final int ERR_OK = 0;    private static final int ERROR = -1;    private static final int COMMAND_PLUS = IRemoteObject.MIN_TRANSACTION_ID;
        public MyRemote() {        super("MyService_Remote");    }
        @Override    public IRemoteObject asObject() {        return this;    }
        @Override    public boolean onRemoteRequest(int code, MessageParcel data, MessageParcel reply, MessageOption option) {        if (code != COMMAND_PLUS) {            reply.writeInt(ERROR);            return false;        }        int value1 = data.readInt();        int value2 = data.readInt();        int sum = value1 + value2;        reply.writeInt(ERR_OK);        reply.writeInt(sum);        return true;    }}
    

    对端除了要实现如上所述的客户端外,待连接的PA还需要作如下修改:

    // 为了返回给连接方可调用的代理,需要在该PA中实例化客户端,例如作为该PA的成员变量private MyRemote remote = new MyRemote();// 当该PA接收到连接请求时,即将该客户端转化为代理返回给连接发起侧@Overrideprotected IRemoteObject onConnect(Intent intent) {    super.onConnect(intent);    return remote.asObject();}
    

    完成上述步骤后,可以通过点击事件实现连接、利用连接关系控制PA以及断开连接等行为,代码示例如下:

    // 连接远程PAprivate ClickedListener mConnectRemotePAListener = new ClickedListener() {    @Override    public void onClick(Component arg0) {        scheduleRemoteAbility(new ISelectResult() {            @Override            void onSelectResult(String deviceId) {                if (deviceId != null) {                    Intent connectPAIntent = new Intent();                    // bundleName和abilityName与待连接的PA一一对应                    // 例如:bundleName = "com.helloworld"                    //       abilityName = "com.helloworld.SampleParticleAbility"                    Operation operation = new Intent.OperationBuilder()                            .withDeviceId(deviceId)                            .withBundleName(bundleName)                            .withAbilityName(abilityName)                            .withFlags(Intent.FLAG_ABILITYSLICE_MULTI_DEVICE)                            .build();                    connectPAIntent.setOperation(operation);                    connectAbility(connectPAIntent, mConn);                }            }        });    }};// 控制已连接PA执行加法private ClickedListener mControlPAListener = new ClickedListener() {    @Override    public void onClick(Component arg0) {        if (mProxy != null) {            int ret = -1;            try {                ret = mProxy.plus(10, 20);            } catch (RemoteException e) {                HiLog.error(LABEL, "ControlRemotePA error");            }            btnControlRemotePA.setText("ControlRemotePA result = " + ret);        }    }};// 与远程PA断开连接private ClickedListener mDisconnectRemotePAListener = new ClickedListener() {    @Override    public void onClick(Component arg0) {        // 按钮复位        btnConnectRemotePA.setText("ConnectRemotePA");        btnControlRemotePA.setText("ControlRemotePA");        disconnectAbility(mConn);    }};
    

    说明

    通过连接/断开连接远程PA,与跨设备PA建立长期的管理关系。例如在本例中,通过连接关系得到远程PA的控制代理后,实现跨设备计算加法并将结果返回到本地显示。在实际开发中,开发者可以根据需要实现多种分布式场景,例如:跨设备位置/电量等信息的采集、跨设备计算资源互助等。

  9. 设备A将运行时的FA迁移到设备B,实现业务在设备间无缝迁移。

    // 跨设备迁移FA// 本地FA设置当前运行任务private ClickedListener mContinueAbilityListener = new ClickedListener() {    @Override    public void onClick(Component arg0) {        // 用户选择设备后实现业务迁移        scheduleRemoteAbility(new ISelectResult() {            @Override            public void onSelectResult(String deviceId) {                continueAbility(deviceId);            }        });    }};
    

    FA的迁移还涉及到状态数据的传递,需要继承IAbilityContinuation接口,供开发者实现迁移过程中特定事件的管理能力,代码示例如下:

    public class SampleSlice extends AbilitySlice implements IAbilityContinuation {    @Override    public boolean onSaveData(IntentParams saveData) {        String exampleData = String.valueOf(System.currentTimeMillis());        saveData.setParam("continueParam", exampleData);        return true;    }
        @Override    public boolean onRestoreData(IntentParams restoreData) {        // 远端FA迁移传来的状态数据,开发者可以按照特定的场景对这些数据进行处理        Object data = restoreData.getParam("continueParam");        return true;    }
        @Override    public void onCompleteContinuation(int result) {        btnContinueRemoteFA.setText("ContinueAbility Done");    }}
    

    通过自定义迁移事件相关的行为,最终实现对Ability的迁移。具体的定义可以参考相关的API文档,此处主要以较为常用的两个事件,包括迁移发起端完成迁移的回调onCompleteContinuation(int result)以及接收到远端迁移行为传递数据的回调onRestoreData(IntentParams restoreData)。其他还包括迁移到远端设备的FA关闭的回调onRemoteTerminated()、用于本地迁移发起时保存状态数据的回调onSaveData(IntentParams saveData)和本地发起迁移的回调onStartContinuation()。按照实际应用自定义特定场景对应的回调,可以完成多种场景下FA的迁移任务。

    说明

    • FA迁移可以打通设备间的壁垒,有助于不同能力的设备进行互助。前文以一个简单的例子介绍如何通过分布式任务调度提供的能力,实现FA跨设备的迁移(包括FA启动及状态数据的同步)。
    • FA迁移过程中,远端FA首先接收到发起端FA传输的数据,再执行启动,即onRestoreData()发生在onStart()之前,详见API参考。

相关实例

针对分布式任务调度,有以下示例工程可供参考:

  • DistributedScheduler

    本示例演示了分布式任务调度的六种场景:启动远程FA,启动远程PA,关闭远程PA,连接远程PA,断开连接远程PA, 和FA跨端迁移。

针对分布式任务调度,有以下Codelabs可供参考:

  • 分布式调度启动远程FA

    基于分布式调度的能力,实现远程FA的启动。

  • 分布式亲子早教系统

    基于分布式能力,实现一个多屏互动、跨设备协同的亲子早教系统。

  • 分布式输入法

    基于分布式能力,将手机作为智慧屏的虚拟控制器,控制文字输入和遥控播放。

  • 分布式地图导航

    基于分布式能力,实现地图导航信息在手机-车机-智能穿戴设备之间流转。

results matching ""

    No results matching ""