Skip to content

Commit

Permalink
[amazonechocontrol] Fix push activity handling
Browse files Browse the repository at this point in the history
Signed-off-by: Jan N. Klug <[email protected]>
  • Loading branch information
J-N-K committed Jan 19, 2025
1 parent 6987890 commit 07a672d
Showing 1 changed file with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -116,7 +115,7 @@ public class AccountHandler extends BaseBridgeHandler implements PushConnection.
private final Map<String, EchoHandler> echoHandlers = new ConcurrentHashMap<>();
private final Set<SmartHomeDeviceHandler> smartHomeDeviceHandlers = new CopyOnWriteArraySet<>();
private final Set<FlashBriefingProfileHandler> flashBriefingProfileHandlers = new CopyOnWriteArraySet<>();
private final Set<String> deviceSerialNumbers = new CopyOnWriteArraySet<>();
private final LinkedBlockingQueue<String> pushActivityProcessingQueue = new LinkedBlockingQueue<>();

private final Object synchronizeConnection = new Object();
private Map<String, DeviceTO> serialNumberDeviceMapping = new HashMap<>();
Expand Down Expand Up @@ -629,14 +628,15 @@ public void onPushCommandReceived(PushCommandTO pushCommand) {
}
echoHandler.handlePushCommand(command, payload);
if ("PUSH_EQUALIZER_STATE_CHANGE".equals(command) || "PUSH_VOLUME_CHANGE".equals(command)) {
deviceSerialNumbers.add(dopplerId.deviceSerialNumber);
pushActivityProcessingQueue.add(dopplerId.deviceSerialNumber);

// check if a processing job is already scheduled or we need to create a new one
ScheduledFuture<?> refreshActivityJob = this.refreshActivityJob;
if (refreshActivityJob != null) {
refreshActivityJob.cancel(false);
if (refreshActivityJob == null || refreshActivityJob.isDone()) {
this.refreshActivityJob = scheduler.schedule(
() -> handlePushActivity(pushCommand.timeStamp),
handlerConfig.activityRequestDelay, TimeUnit.SECONDS);
}
this.refreshActivityJob = scheduler.schedule(
() -> handlePushActivity(deviceSerialNumbers, pushCommand.timeStamp),
handlerConfig.activityRequestDelay, TimeUnit.SECONDS);
}
}
}
Expand Down Expand Up @@ -673,22 +673,25 @@ private List<CustomerHistoryRecordTO> getCustomerActivity(@Nullable Long timesta
return connection.getActivities(startTimestamp, endTimestamp);
}

private void handlePushActivity(Set<String> deviceSerialNumbers, @Nullable Long timestamp) {
private void handlePushActivity(@Nullable Long timestamp) {
Set<String> deviceSerialNumbers = new HashSet<>();
pushActivityProcessingQueue.drainTo(deviceSerialNumbers);

List<CustomerHistoryRecordTO> activityRecords = getCustomerActivity(timestamp);

Iterator<String> iterator = deviceSerialNumbers.iterator();
while (iterator.hasNext()) {
while (!pushActivityProcessingQueue.isEmpty()) {
String deviceSerialNumber = pushActivityProcessingQueue.poll();
try {
String deviceSerialNumber = iterator.next();
Objects.requireNonNull(deviceSerialNumber);
EchoHandler echoHandler = echoHandlers.get(deviceSerialNumber);
if (echoHandler == null) {
logger.warn("Could not find thing handler for serialnumber {}", deviceSerialNumber);
return;
}
activityRecords.stream().filter(r -> r.recordKey.endsWith(deviceSerialNumber))
.forEach(echoHandler::handlePushActivity);
} finally {
iterator.remove();
} catch (RuntimeException e) {
logger.warn("Could not handle push activity", e);
}
}
}
Expand Down

0 comments on commit 07a672d

Please sign in to comment.