Class MqttBrokerConnection

java.lang.Object
org.openhab.core.io.transport.mqtt.MqttBrokerConnection

@NonNullByDefault public class MqttBrokerConnection extends Object
An MQTTBrokerConnection represents a single client connection to a MQTT broker. When a connection to an MQTT broker is lost, it will try to reconnect every 60 seconds.
Author:
Davy Vanherbergen - Initial contribution, David Graeff - All operations are async now. More flexible sslContextProvider and reconnectStrategy added., Markus Rathgeb - added connection state callback, Jan N. Klug - changed from PAHO to HiveMQ client, Mark Herwege - Added flag for hostname validation
  • Field Details

  • Constructor Details

    • MqttBrokerConnection

      public MqttBrokerConnection(String host, @Nullable Integer port, boolean secure, @Nullable String clientId)
      Create a new TCP MQTT3 client connection to a server with the given host and port.
      Parameters:
      host - A host name or address
      port - A port or null to select the default port for a secure or insecure connection
      secure - A secure connection
      clientId - Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are used for access restriction implementations. If none is specified, a default is generated. The client id cannot be longer than 65535 characters.
      Throws:
      IllegalArgumentException - If the client id or port is not valid.
    • MqttBrokerConnection

      public MqttBrokerConnection(String host, @Nullable Integer port, boolean secure, boolean hostnameValidated, @Nullable String clientId)
      Create a new TCP MQTT3 client connection to a server with the given host and port.
      Parameters:
      host - A host name or address
      port - A port or null to select the default port for a secure or insecure connection
      secure - A secure connection
      hostnameValidated - Validate hostname from certificate against server hostname for secure connection
      clientId - Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are used for access restriction implementations. If none is specified, a default is generated. The client id cannot be longer than 65535 characters.
      Throws:
      IllegalArgumentException - If the client id or port is not valid.
    • MqttBrokerConnection

      public MqttBrokerConnection(MqttBrokerConnection.Protocol protocol, MqttBrokerConnection.MqttVersion mqttVersion, String host, @Nullable Integer port, boolean secure, @Nullable String clientId)
      Create a new MQTT client connection to a server with the given protocol, host and port.
      Parameters:
      protocol - The transport protocol
      mqttVersion - The version of the MQTT client (v3 or v5)
      host - A host name or address
      port - A port or null to select the default port for a secure or insecure connection
      secure - A secure connection
      clientId - Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are used for access restriction implementations. If none is specified, a default is generated. The client id cannot be longer than 65535 characters.
      Throws:
      IllegalArgumentException - If the client id or port is not valid.
    • MqttBrokerConnection

      public MqttBrokerConnection(MqttBrokerConnection.Protocol protocol, MqttBrokerConnection.MqttVersion mqttVersion, String host, @Nullable Integer port, boolean secure, boolean hostnameValidated, @Nullable String clientId)
      Create a new MQTT client connection to a server with the given protocol, host and port.
      Parameters:
      protocol - The transport protocol
      mqttVersion - The version of the MQTT client (v3 or v5)
      host - A host name or address
      port - A port or null to select the default port for a secure or insecure connection
      secure - A secure connection
      hostnameValidated - Validate hostname from certificate against server hostname for secure connection
      clientId - Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are used for access restriction implementations. If none is specified, a default is generated. The client id cannot be longer than 65535 characters.
      Throws:
      IllegalArgumentException - If the client id or port is not valid.
  • Method Details

    • setReconnectStrategy

      public void setReconnectStrategy(AbstractReconnectStrategy reconnectStrategy)
      Set the reconnect strategy. The implementor will be called when the connection state to the MQTT broker changed. The reconnect strategy will not be informed if the initial connection to the broker timed out. You need a timeout executor additionally, see setTimeoutExecutor(ScheduledExecutorService, int).
      Parameters:
      reconnectStrategy - The reconnect strategy. May not be null.
    • getReconnectStrategy

      public @Nullable AbstractReconnectStrategy getReconnectStrategy()
      Returns:
      Return the reconnect strategy
    • setTimeoutExecutor

      public void setTimeoutExecutor(@Nullable ScheduledExecutorService executor, int timeoutInMS)
      Set a timeout executor. If none is set, you will not be notified of connection timeouts, this also includes a non-firing reconnect strategy. The default executor is none.
      Parameters:
      executor - One timer will be created when a connection attempt happens
      timeoutInMS - Timeout in milliseconds
    • setTrustManagers

      public void setTrustManagers(TrustManager[] trustManagers)
    • getTrustManagers

      public TrustManager[] getTrustManagers()
    • getProtocol

      public MqttBrokerConnection.Protocol getProtocol()
      Get the MQTT broker protocol
    • getMqttVersion

      public MqttBrokerConnection.MqttVersion getMqttVersion()
      Get the MQTT version
    • getHost

      public String getHost()
      Get the MQTT broker host
    • getPort

      public int getPort()
      Get the MQTT broker port
    • isSecure

      public boolean isSecure()
      Return true if this is or will be an encrypted connection to the broker
    • isHostnameValidated

      public boolean isHostnameValidated()
      Return true if hostname in certificate is validated against server hostname for secure connection
    • setCredentials

      public void setCredentials(@Nullable String user, @Nullable String password)
      Set the optional user name and optional password to use when connecting to the MQTT broker. The connection needs to be restarted for the new settings to take effect.
      Parameters:
      user - Name to use for connection.
      password - The password
    • getPassword

      public @Nullable String getPassword()
      Returns:
      connection password.
    • getUser

      public @Nullable String getUser()
      Returns:
      optional user name for the MQTT connection.
    • getQos

      public int getQos()
      Returns:
      quality of service level.
    • setQos

      public void setQos(int qos)
      Set quality of service. Valid values are 0, 1, 2 and mean "at most once", "at least once" and "exactly once" respectively. The connection needs to be restarted for the new settings to take effect.
      Parameters:
      qos - level.
    • getLastWill

      public @Nullable MqttWillAndTestament getLastWill()
      Return the last will object or null if there is none.
    • setLastWill

      public void setLastWill(@Nullable MqttWillAndTestament lastWill, boolean applyImmediately) throws org.osgi.service.cm.ConfigurationException, MqttException
      Set the last will object.
      Parameters:
      lastWill - The last will object or null.
      applyImmediately - If true, the connection will stopped and started for the new last-will to take effect immediately.
      Throws:
      MqttException
      org.osgi.service.cm.ConfigurationException
    • setLastWill

      public void setLastWill(@Nullable MqttWillAndTestament lastWill)
      Set the last will object. The connection needs to be restarted for the new settings to take effect.
      Parameters:
      lastWill - The last will object or null.
    • setUnsubscribeOnStop

      public void setUnsubscribeOnStop(boolean unsubscribeOnStop)
      Enable / disable sending Unsubscribe command when the connection is closed Some servers can be quirky, then do not handle Usubscribe request properly. In this case we have to omit sending it. Example: iRobot built-in MQTT server. By default this behavior is set to true.
      Parameters:
      unsubscribeOnStop - Enable or disable flag.
    • getClientId

      public String getClientId()
      Get client id to use when connecting to the broker.
      Returns:
      value clientId to use.
    • connectionState

      public MqttConnectionState connectionState()
      Returns the connection state
    • setKeepAliveInterval

      public void setKeepAliveInterval(int keepAliveInterval)
      Set the keep alive interval. The default interval is 60 seconds. If no heartbeat is received within this timeframe, the connection will be considered dead. Set this to a higher value on systems which may not always be able to process the heartbeat in time.
      Parameters:
      keepAliveInterval - interval in seconds
    • getKeepAliveInterval

      public int getKeepAliveInterval()
      Return the keep alive internal in seconds
    • hasSubscribers

      public boolean hasSubscribers()
      Return true if there are subscribers registered via subscribe(String, MqttMessageSubscriber). Call unsubscribe(String, MqttMessageSubscriber) or unsubscribeAll() if necessary.
    • subscribe

      public CompletableFuture<Boolean> subscribe(String topic, MqttMessageSubscriber subscriber)
      Add a new message consumer to this connection. Multiple subscribers with the same topic are allowed. This method will not protect you from adding a subscriber object multiple times! If there is a retained message for the topic, you are guaranteed to receive a callback for each new subscriber, even for the same topic.
      Parameters:
      topic - The topic to subscribe to.
      subscriber - The callback listener for received messages for the given topic.
      Returns:
      Completes with true if successful. Completes with false if not connected yet. Exceptionally otherwise.
    • subscribeRaw

      protected CompletableFuture<Boolean> subscribeRaw(String topic, org.openhab.core.io.transport.mqtt.internal.Subscription subscription)
      Subscribes to a topic on the given connection, but does not alter the subscriber list.
      Parameters:
      topic - The topic to subscribe to.
      Returns:
      Completes with true if successful. Exceptionally otherwise.
    • unsubscribe

      public CompletableFuture<Boolean> unsubscribe(String topic, MqttMessageSubscriber subscriber)
      Remove a previously registered consumer from this connection. If no more consumers are registered for a topic, the topic will be unsubscribed from.
      Parameters:
      topic - The topic to unsubscribe from.
      subscriber - The callback listener to remove.
      Returns:
      Completes with true if successful. Exceptionally otherwise.
    • unsubscribeRaw

      protected CompletableFuture<Boolean> unsubscribeRaw(org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper client, String topic)
      Unsubscribes from a topic on the given connection, but does not alter the subscriber list.
      Parameters:
      client - The client connection
      topic - The topic to unsubscribe from
      Returns:
      Completes with true if successful. Completes with false if no broker connection is established. Exceptionally otherwise.
    • addConnectionObserver

      public void addConnectionObserver(MqttConnectionObserver connectionObserver)
      Add a new connection observer to this connection.
      Parameters:
      connectionObserver - The connection observer that should be added.
    • removeConnectionObserver

      public void removeConnectionObserver(MqttConnectionObserver connectionObserver)
      Remove a previously registered connection observer from this connection.
      Parameters:
      connectionObserver - The connection observer that should be removed.
    • hasConnectionObservers

      public boolean hasConnectionObservers()
      Return true if there are connection observers registered via addConnectionObserver().
    • start

      public CompletableFuture<Boolean> start()
      This will establish a connection to the MQTT broker and if successful, notify all publishers and subscribers that the connection has become active. This method will do nothing if there is already an active connection.
      Returns:
      Returns a future that completes with true if already connected or connecting, completes with false if a connection timeout has happened and completes exceptionally otherwise.
    • createClient

      protected org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper createClient()
    • finalizeStopAfterDisconnect

      protected boolean finalizeStopAfterDisconnect(boolean v)
      After a successful disconnect, the underlying library objects need to be closed and connection observers want to be notified.
      Parameters:
      v - A passthrough boolean value
      Returns:
      Returns the value of the parameter v.
    • unsubscribeAll

      public CompletableFuture<Void> unsubscribeAll()
      Unsubscribe from all topics
      Returns:
      Returns a future that completes as soon as all subscriptions have been canceled.
    • stop

      public CompletableFuture<Boolean> stop()
      Unsubscribes from all subscribed topics, stops the reconnect strategy, disconnect and close the client. You can re-establish a connection calling start() again. Do not call start, before the closing process has finished completely.
      Returns:
      Returns a future that completes as soon as the disconnect process has finished.
    • publish

      public CompletableFuture<Boolean> publish(String topic, byte[] payload, int qos, boolean retain)
      Publish a message to the broker with the given QoS and retained flag.
      Parameters:
      topic - The topic
      payload - The message payload
      qos - The quality of service for this message
      retain - Set to true to retain the message on the broker
      Returns:
      Returns a future that completes with a result of true if the publishing succeeded and completes exceptionally on an error or with a result of false if no broker connection is established.
    • cancelTimeoutFuture

      protected void cancelTimeoutFuture()
      The connection process is limited by a timeout, realized with a CompletableFuture. Cancel that future now, if it exists.