Page 280
wifi=network.WLAN(network.STA_IF)
wifi.active(True)
wifi.disconnect()
aps=wifi.scan()
for ap in aps:
print(ap)
Page 282
from http.server import HTTPServer, BaseHTTPRequestHandler
from io import BytesIO
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def sendResponse(self, cmd):
content_length = int(self.headers['Content-Length'])
body = self.rfile.read(content_length)
self.send_response(200)
self.end_headers()
response = BytesIO()
response.write(b'This is a '+bytes(cmd, 'utf-8')+
b' request. ')
response.write(b'Received: ')
response.write(body)
self.wfile.write(response.getvalue())
def do_GET(self):
self.send_response(200)
self.end_headers()
self.wfile.write(b'Hello, world!')
def do_HEAD(self):
self.send_response(200)
self.end_headers()
def do_POST(self):
self.sendResponse("POST")
def do_PUT(self):
self.sendResponse("PUT")
def do_DELETE(self):
self.sendResponse("DELETE")
def do_PATCH(self):
self.sendResponse("PATCH")
httpd = HTTPServer(('', 8080), SimpleHTTPRequestHandler)
httpd.serve_forever()
Page 285
import urequests
import network
import rp2
from machine import Pin, Timer
from time import sleep_ms
def setup(country, ssid, key):
rp2.country(country)
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.disconnect()
LED = Pin("LED", Pin.OUT)
LED.high()
timeout = 20000
wifi.connect(ssid, key)
timer = Timer()
timer.init(period=200, mode=Timer.PERIODIC,
callback=lambda t: LED.toggle())
s = 0
while timeout > 0:
s = wifi.status()
if s == 3 or s < 0:
break
sleep_ms(100)
timeout = timeout-100
if(s < 2):
timer.init(period=1000, mode=Timer.PERIODIC,
callback=lambda t: LED.toggle())
else:
timer.deinit()
LED.high()
return wifi
wifi = setup("country", "ssid", "key")
url = "http://192.168.253.45:8080"
r = urequests.get(url)
print(r.content)
r.close()
buf = b'Hello World'
r = urequests.post(url, data=buf)
print(r.content)
r.close()
r = urequests.put(url, data=buf)
print(r.content)
r.close()
r = urequests.patch(url, data=buf)
print(r.content)
r.close()
r = urequests.head(url)
print(r.content)
print(r.headers)
r.close()
r = urequests.delete(url, data=buf)
print(r.content)
r.close()
Page 287
from machine import Pin, Timer
import network
import rp2
import onewire
import ds18x20
from time import sleep_ms
import urequests
def setup(country, ssid, key):
rp2.country(country)
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.disconnect()
LED = Pin("LED", Pin.OUT)
LED.high()
timeout = 20000
wifi.connect(ssid, key)
timer = Timer()
timer.init(period=200, mode=Timer.PERIODIC,
callback=lambda t: LED.toggle())
s = 0
while timeout > 0:
s = wifi.status()
if s == 3 or s < 0:
break
sleep_ms(100)
timeout = timeout-100
if(s < 2):
timer.init(period=1000, mode=Timer.PERIODIC,
callback=lambda t: LED.toggle())
else:
timer.deinit()
LED.high()
return wifi
wifi = setup("country", "ssid", "key")
url = "http://192.168.253.45:8080"
ow = onewire.OneWire(Pin(22))
presence = ow.reset()
if presence:
print("Device present")
else:
print("No device")
DS = ds18x20.DS18X20(ow)
roms = DS.scan()
while True:
DS.convert_temp()
temp = DS.read_temp(roms[0])
buf = str(temp).encode("utf-8")
try:
r = urequests.put(url, data=buf)
r.close()
except:
print("Server Not Online")
sleep_ms(500)
Page 288
from http.server import HTTPServer, BaseHTTPRequestHandler
from io import BytesIO
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def log_message(self,*args, **kwargs):
pass
def do_PUT(self):
content_length = int(self.headers['Content-Length'])
body = self.rfile.read(content_length)
bodyString= body.decode(encoding="utf-8")
temp=float(bodyString)
print(temp)
self.send_response(200)
self.end_headers()
httpd = HTTPServer(('', 8080), SimpleHTTPRequestHandler)
httpd.serve_forever()
Page 294
import network
import socket
import rp2
from machine import Pin, Timer
from time import sleep_ms
def setup(country, ssid, key):
rp2.country(country)
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.disconnect()
LED = Pin("LED", Pin.OUT)
LED.high()
timeout = 20000
wifi.connect(ssid, key)
timer = Timer()
timer.init(period=200, mode=Timer.PERIODIC,
callback=lambda t: LED.toggle())
s = 0
while timeout > 0:
s = wifi.status()
if s == 3 or s < 0:
break
sleep_ms(100)
timeout = timeout-100
if(s < 2):
timer.init(period=1000, mode=Timer.PERIODIC,
callback=lambda t: LED.toggle())
else:
timer.deinit()
LED.high()
return wifi
wifi = setup("country", "ssid", "key")
ai = socket.getaddrinfo("www.example.com", 80,socket.AF_INET)
addr = ai[0][-1]
s = socket.socket(socket.AF_INET)
s.connect(addr)
request = b"GET /index.html HTTP/1.1\r\nHost:example.org\r\n\r\n"
s.send(request)
print(s.recv(512))
Page 296
import network
import socket
import rp2
from machine import Pin, Timer
import ssl
from time import sleep_ms
def setup(country, ssid, key):
rp2.country(country)
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.disconnect()
LED = Pin("LED", Pin.OUT)
LED.high()
timeout = 20000
wifi.connect(ssid, key)
timer = Timer()
timer.init(period=200, mode=Timer.PERIODIC,
callback=lambda t: LED.toggle())
s = 0
while timeout > 0:
s = wifi.status()
if s == 3 or s < 0:
break
sleep_ms(100)
timeout = timeout-100
if(s < 2):
timer.init(period=1000, mode=Timer.PERIODIC,
callback=lambda t: LED.toggle())
else:
timer.deinit()
LED.high()
return wifi
wifi = setup("country", "ssid", "key")
ai = socket.getaddrinfo("example.com", 443,socket.AF_INET)
addr = ai[0][-1]
s = socket.socket(socket.AF_INET)
s.connect(addr)
sslSock=ssl.wrap_socket(s)
request = b"GET / HTTP/1.1\r\nHost:example.com\r\n\r\n"
sslSock.write(request)
print(sslSock.read(1024))
Page 299-300
import network
import socket
import rp2
from time import sleep_ms
from machine import Pin, Timer
import onewire
import ds18x20
def setup(country, ssid, key):
rp2.country(country)
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.disconnect()
LED = Pin("LED", Pin.OUT)
LED.high()
timeout = 20000
wifi.connect(ssid, key)
timer = Timer()
timer.init(period=200, mode=Timer.PERIODIC,
callback=lambda t: LED.toggle())
s = 0
while timeout > 0:
s = wifi.status()
if s == 3 or s < 0:
break
sleep_ms(100)
timeout = timeout-100
if(s < 2):
timer.init(period=1000, mode=Timer.PERIODIC,
callback=lambda t: LED.toggle())
else:
timer.deinit()
LED.high()
return wifi
wifi = setup("country", "ssid", "key")
print("connected")
print(wifi.ifconfig())
ow = onewire.OneWire(Pin(22))
presence = ow.reset()
if presence:
print("Device present")
else:
print("No device")
DS = ds18x20.DS18X20(ow)
roms = DS.scan()
template = """<!DOCTYPE html>
<html>
<head> <title>Temperature</title> </head>
<body> <h1>Current Temperature</h1>
Hello Pico W Server World <br/>
The Temperature is: <!--#temp--><br/>
</body>
</html>
"""
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(0)
while True:
cl, addr = s.accept()
print('client connected from', addr)
print(cl.recv(512))
DS.convert_temp()
temp = DS.read_temp(roms[0])
html=template.replace("<!--#temp-->",str(temp))
headers = ("HTTP/1.1 200 OK\r\n"
"Content-Type: text/html; charset=UTF-8\r\n"
"Server:Pico\r\n"
f"Content-Length:{len(html)}\r\n\r\n"
)
buf = headers.encode("utf-8")+html.encode("utf-8")
cl.send(buf)
cl.close()
s.close()
Page 303
import network
import socket
import rp2
from time import sleep_ms
from machine import Pin, Timer
import onewire
import ds18x20
import ssl
def setup(country, ssid, key):
rp2.country(country)
wifi=network.WLAN(network.STA_IF)
wifi.active(True)
wifi.disconnect()
LED=Pin("LED", Pin.OUT)
LED.high()
timeout=20000
wifi.connect(ssid,key)
timer=Timer()
timer.init(period=200, mode=Timer.PERIODIC,
callback=lambda t:LED.toggle())
s=0
while timeout>0:
s=wifi.status()
if s==3 or s<0:
break
sleep_ms(100)
timeout=timeout-100
if(s<2):
timer.init(period=1000, mode=Timer.PERIODIC,
callback=lambda t:LED.toggle())
else:
timer.deinit()
LED.high()
return wifi
key=b'0\x82\x04\xbf\x02\x01\x000\r\x06\t*\x86H\x86\xf7\r\x01\x01\x01\x05\x00\x04\x82\x04\xa90\x82\x04\xa5\x02\x01\x00\x02\x82\x01\x01\x00\xb9SU\x05\x056\x8c\x92I\\?\xe5rb\x14\xaceG\xb8\x04n\x8f\xb6O\x04o\x9a\xcdP\xbe\xc3\xfb\x07\x8fl\xe4E\xd6\xf6>\xeb\\\xff\x9f\xbe\xad\x941\xda=\x9f\xca\x19\xa8iL\xa8\x91\xd8\xad\xafm\xb7\\|\xd8\x82\xa05:Oe\xe6\xe5\x18\xa6\x0f.\xea\x10<\xe6qh\x00?K\x12m)\xea\x08)\x9e>\'\xab\xca_\xddH\xa3\xd08\xe5\xec0+RB\xae\x8bR\'S\x8dq\x86\xc9\xa9{\xbd1W\xcbG\xb8\xe7V\xbc\x12\x077\x93\xab\xa1\xd6\xb1o\xc1\xdf\x0b\x0b\xa1\xf4Vm\x18\x8b\xf3\xf2\x9c\x86\xa1o\x92|Y\t\x85\x08UpnIFi3\x1d\xd4\x12\x863\n\xef\xb0\xa3\xbf\xd4\xf6\x1c7\x993\xe4\xf4$\x17\xe8\xb7\xb8k\xaaV\xeeB\x92\xa6*\xd9\xa9Y\x88P\x00I\x93Z4)\xb5\xfaW\xe6b\x04L\xee\xe9\xa4\xb1\xbf\
'@s\xd0\xb3}\xc3\xf1[\t\xd9\x91\xeau\x0b\xd2\xba\x94\x7f\xbdO\x11\\e\x81Ed\x88\x97Z\x1a~\x16!\x02\x03\x01\x00\x01\x02\x82\x01\x01\x00\x81\xd9\x88\x85\x86\xfc\x8c\x8b\xe7\x08\xd2\xe0R?\xb4\x9a\x8209\x18)\xdbY\xf2\x8dz!-\xe0xyZ\xc7\x16PF\xb5D\x83\xae\xdc<\x82\x03\x0c\x98\x14p\xc5\xa8M\xf0M\xff\xf9\x1f\xb40\xd4p\x05\xad\xcb>\xeb^\xccO\xb2[\xd3\xcb\xe3v\xfb\xc9Ft)\x9e\x0c\xfd\xad\xd2\x1f\xf29\x08\x85"L\x0fB\x11\xd5\x1c\xf8\xbaHg\x04\x81z\xe0\x93\x00\xe5SED\xe1\x85N\x9f\xadd:z%\x8c1\xde\x02\xd7\xaf\xdf\xe6\x06F\xf6\xdc\x03\xf2K\x19B \x8eX\x9dx\xbb\x9c\xbcE\xa8:\x0f\xab\x0b\xd7\x8f\x7fV\xb1\x0c\xe4\x83\xfc\xcd\x9c\x1c\x072\x0e#\xc2\xb22o94r\xf6kp+\xd9\xcc/\x80\x11\xaa\xeb\xea\xac\'\xba\xecm\xe3;\xbf\xee{\xf7\x07K#\xe7\xc5\xadQ3\x9a[^\xf8\xe5\xb4}\xdc4\xedrD\x01\xef\x8ek\x100B\x03t6^\x87\\\x9c\xf9\x98\tN\xb9\xd9\x9a\xf1\xd3,o\xf5\xf3>\x9f8\xb7\x1f\x1a%\r\x8d\x9a\xd4S\x01\x02\x81\x81\x00\xee\xa5h5)w\xba,\x83\xad!\xa5\x7fzg\xa8v\x97F\xe0\x9a\x18\xe3\xc01\xb4\xd2\x96\x1a;\x94>\x99\xd9\xad\xcd<\xee\xf1\x9e\x041\x06\x9a\x8f\x82\xa4&z\xe0;[8{\xecu"\xa3\xeav&\x1e\xf6\xc2<F^\x19\x91\xa7\x05\x80=u\xe4\xf6c\xeb\x99\xcb\xb6\\\xcb\x88\x96]\x07\xf2\xef\xa8\x80\x0e\x9dh`\x19\xa5+\x03\xd3<\x06\xf4\x81\x96\x18\xe0>i\xbe:\xcc\xb9\x0cY\xfa\x03\x18\x11\'\x10\xa66\xb7-\xfdOq\x02\x81\x81\x00\xc6\xcdQ}\xa2\xdf\x84\x95\n\xac+\x1fx\x98\xb02\x19\x10yP\xa4<\xba\xb1F\xbc\x18\xdc \xe7\x91k\x8ed1a\xdb\xe1\x01\x0e\xb3\xba\x7fF{\xa9\x15\xcfN!\xaeBE\xe2\xdfv\xab\xa7\xf2/\xf7\x9cZ\xb1f\xbc\xe0z\x11\xab\xaf\xc0\x01 v_\x01\x1b\xac\xb8Q.h\xaa1yXF\x95\x1fb\x9e\x9c\x87\x9a[\x98\x90\x96\x0c\xe3\xf5#\nu8\\\xf0\x82\xf0\xec\x97\xa9\xb0\x10q\xd8\x84o\x0b#\r<\x05\x81J9\xb1\x02\x81\x80jA\x83\x90\x88\x12"\xf6\xc6\xfaCL\xe8\xe1\x9b]\xca\xcf\xb8c0\xb9|N\x8a\xd34 Y5\xc5\xdf\xc9\xa8\xbeU\xef\x97\x84E\x13 \xb1\x0c\x08q\xe6\x9c\xab\x81ClnM\xdf\x0e\x98\x89\xdbO\x17\xd2\x19\x94\x8a9\xda\x94\x0f\xe2\t\xf4\xfbh\x8e\xb5\x95\xef\xc4\xde\x8b\'\xee\x07\xb6\xcb]J\xb1\xa2\x98\xc1\xe9\x1c\x1c\r\xcf\x18\xc3\xef=~\xebF\xf7\x89\xc3\xee\x86.\x89\x07\xb6,\xe5\xb3\x07\xc5\xa3}}PDts)\xa2A\x02\x81\x81\x00\xac#^[\x86+:\x96\xff-\xc3\r6\x14(\x04\xc9\x15-\xa6x\xff\xa8\xbc\x15\xbe\x8b\\\x18\x15\xcb"1\xa2i\xec\rC\x0f\xf2V\x07\xb7k%jl[\x1b\x91(]t<\x158\xa1<\x04\x06*\xc64\xf5\x85;(\xb8*\x12\xdaTK\xe5z\xf9\x9aq\x07&v\x0c\xd4N\x02\x16\xcb,\x1a\xb5\x99d3\xafk%\xc2\xbd\xf7_d\x07\x7f\xf6\xef7\x05\xaa\xb0\x06\xc3&3\xa5#( \xcd\xd3\x84\xf6-\xe0y\xf7\xd0x\x91\x02\x81\x81\x00\x9fn\xb6|\xbf\x8bu\x98\xf0\xc3\xc2k\x1cn\x18y3\x88\xc8\x07l+\xc5\xbf\xd0\x1f\xa8\xe2e\xc6=\xb8\x1a\xfdW\x929;\xfc5~\x96\xb3\x9e\xa5A\xe3\x94\x8d\xc1I\x91\x87\xe0\xcd\x9bO\x80Z\xdcf\x157\xd0\x96:\x16\xb4%\xce\xd9\x17\xfa\xbeF\x19\x81\xc9\x96\xc2\xfd\x84\xafF\x87"\x91\xfb<\xd0\xc8\x11\xadJb\xf1\x17q\xeb\xb0\x11\xc3W\xc7\xf5\xe1\xa1B\x89\x8bZ\xc6\xe7\xa2\xf4?w\xa4\x0c\n\x89\xfd\x00S4\xabp\xd1'
cert=b'0\x82\x03k0\x82\x02S\xa0\x03\x02\x01\x02\x02\x14b\x01\xd9E\xad\x0e\xb3\x7f\x14\xc1\x83\x029V4|\x82E2U0\r\x06\t*\x86H\x86\xf7\r\x01\x01\x0b\x05\x000E1\x0b0\t\x06\x03U\x04\x06\x13\x02AU1\x130\x11\x06\x03U\x04\x08\x0c\nSome-State1!0\x1f\x06\x03U\x04\n\x0c\x18Internet Widgits Pty Ltd0\x1e\x17\r221219141736Z\x17\r231219141736Z0E1\x0b0\t\x06\x03U\x04\x06\x13\x02AU1\x130\x11\x06\x03U\x04\x08\x0c\nSome-State1!0\x1f\x06\x03U\x04\n\x0c\x18Internet Widgits Pty Ltd0\x82\x01"0\r\x06\t*\x86H\x86\xf7\r\x01\x01\x01\x05\x00\x03\x82\x01\x0f\x000\x82\x01\n\x02\x82\x01\x01\x00\xb9SU\x05\x056\x8c\x92I\\?\xe5rb\x14\xaceG\xb8\x04n\x8f\xb6O\x04o\x9a\xcdP\xbe\xc3\xfb\x07\x8fl\xe4E\xd6\xf6>\xeb\\\xff\x9f\xbe\xad\x941\xda=\x9f\xca\x19\xa8iL\xa8\x91\xd8\xad\xafm\xb7\\|\xd8\x82\xa05:Oe\xe6\xe5\x18\xa6\x0f.\xea\x10<\xe6qh\x00?K\x12m)\xea\x08)\x9e>\'\xab\xca_\xddH\xa3\xd08\xe5\xec0+RB\xae\x8bR\'S\x8dq\x86\xc9\xa9{\xbd1W\xcbG\xb8\xe7V\xbc\x12\x077\x93\xab\xa1\xd6\xb1o\xc1\xdf\x0b\x0b\xa1\xf4Vm\x18\x8b\xf3\xf2\x9c\x86\xa1o\x92|Y\t\x85\x08UpnIFi3\x1d\xd4\x12\x863\n\xef\xb0\xa3\xbf\xd4\xf6\x1c7\x993\xe4\xf4$\x17\xe8\xb7\xb8k\xaaV\xeeB\x92\xa6*\xd9\xa9Y\x88P\x00I\x93Z4)\xb5\xfaW\xe6b\x04L\xee\xe9\xa4\xb1\xbf\
'@s\xd0\xb3}\xc3\xf1[\t\xd9\x91\xeau\x0b\xd2\xba\x94\x7f\xbdO\x11\\e\x81Ed\x88\x97Z\x1a~\x16!\x02\x03\x01\x00\x01\xa3S0Q0\x1d\x06\x03U\x1d\x0e\x04\x16\x04\x14\xcf\x1e\x1ce\xccs,\xd7b^\xd3\x99n\xa4\xebi\xe7\xe3\xe9S0\x1f\x06\x03U\x1d#\x04\x180\x16\x80\x14\xcf\x1e\x1ce\xccs,\xd7b^\xd3\x99n\xa4\xebi\xe7\xe3\xe9S0\x0f\x06\x03U\x1d\x13\x01\x01\xff\x04\x050\x03\x01\x01\xff0\r\x06\t*\x86H\x86\xf7\r\x01\x01\x0b\x05\x00\x03\x82\x01\x01\x00\x1c\x1d\x88\xca\xbd/5\xdf\x1b\xdb\xfe\xe4\x92\x7f\x98AK\xe37\x18qE\xf9\xc7D\x9ex\xb2Fo\xc3w\x17V\xe5fD1\x88\x7f\xe5\xfe\xd5}x\x10i\x9ah\xccl-*/\xfaa\x99\xc0\x82\xec\xbe\xca\'\x91v<\x1e g\xc3mm\xfe\x0f\xf7\xc5\xa8\xc8\xe0E\xce\xbb\\\x87s\x1d\x92b\xdd\x11\x12\x99\x196\x81\xf1\x15F\xf7i\xa7\xa2\x9b\xe6yG\x0e\xd73\x82\x13\x14\x95\xa4f-X\xcf\xb2\xcd\xb9AY=\xce\xb4 X\xe7o\xf5\x83\xad\xb5{y\xb4\x84\xd1\xa8\x0f\xcc;\x19\xd2\xd8A:\x1f\xd3\xc20\xb5\x12\'\xa0\xf0y\x8fE\x97\x0b\xbaj\x1c\xc6+\xfd\t$,\x05 B\x98C\x14j\xa2v6/q\xe8\x1bC\xd2w\xd7Z\xbeG\xd23",`\x00P\xb8a/\xea\xc5/\xc6Gh\xbe\x1e5\n`\x97g\t\xe6\xee\xfd\x88x\xfb\xeb\xe5\xb1\xd3\x96\x896*LE\xb7q\xe9\xf6\x07v\xac~\'s=\xe5\xa1N\x00f\x85!\x0e\xddt*\x98\x8a'
wifi = setup("country", "ssid", "key")
print(wifi.ifconfig())
ow = onewire.OneWire(Pin(22))
presence = ow.reset()
if presence:
print("Device present")
else:
print("No device")
DS = ds18x20.DS18X20(ow)
roms = DS.scan()
template = """<!DOCTYPE html>
<html>
<head> <title>Temperature</title> </head>
<body> <h1>Current Temperature</h1>
Hello Pico W Server World <br/>
The Temperature is: <!--#temp--><br/>
</body>
</html>
"""
addr = socket.getaddrinfo('0.0.0.0', 443)[0][-1]
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(5)
while True:
cl, addr = s.accept()
print('client connected from', addr)
client_s =None
try:
client_s = ssl.wrap_socket(cl, server_side=True, key=key, cert=cert)
print("wrapped",client_s)
while True:
h = client_s.readline()
if h == b"" or h == b"\r\n":
break
print(h.decode(), end="")
DS.convert_temp()
temp = DS.read_temp(roms[0])
html=template.replace("<!--#temp-->",str(temp))
headers = ("HTTP/1.1 200 OK\r\n"
"Content-Type: text/html; charset=UTF-8\r\n"
"Server:Pico\r\n"
f"Content-Length:{len(html)}\r\n\r\n"
)
buf = headers.encode("utf-8")+html.encode("utf-8")
client_s.write(buf)
client_s.close()
except Exception as e:
print("exception ",e)
s.close()
Page 315
import uasyncio
async def count(n):
for i in range(n):
print(i)
return n
async def main(myValue):
t1 = uasyncio.create_task(count(10))
print("Hello Coroutine World")
await uasyncio.sleep(5)
result = await t1
print("The result of the task =",result)
return myValue
result= uasyncio.run(main(42))
print(result)
Page 317
import uasyncio
async def test1(msg):
print(msg)
return msg
async def main():
result = await uasyncio.gather(test1("one"),test1("two"))
print(result)
print("Hello Coroutine World")
uasyncio.run(main())
Page 319
import uasyncio
async def test1(msg):
try:
await uasyncio.sleep(0)
except:
pass
print(msg)
return msg
async def main():
t1 = uasyncio.create_task(test1("one"))
await uasyncio.sleep(0)
t1.cancel()
print("Hello Coroutine World")
await uasyncio.sleep(0)
uasyncio.run(main())
Page 319
import uasyncio
async def test(msg):
print(msg)
raise Exception("Test exception")
async def main():
t1=uasyncio.create_task(test("one"))
try:
await t1
except:
print("an exception has occurred")
print("Hello Coroutine World")
await uasyncio.sleep(0)
uasyncio.run(main())
Page 320
import uasyncio
async def test(msg):
print(msg)
raise Exception("Test exception")
async def main():
t1=uasyncio.create_task(test("one"))
result=None
try:
result=await uasyncio.gather(t1,return_exceptions=True)
except:
print("an exception has occurred")
print("Hello Coroutine World")
print(result)
await uasyncio.sleep(0)
uasyncio.run(main())
Page 322
import uasyncio
async def count():
global myCounter
for i in range(1000):
temp = myCounter+1
await uasyncio.sleep(0)
myCounter = temp
async def main():
await uasyncio.gather(count(),count())
print(myCounter)
myCounter=0
uasyncio.run(main())
Page 323
import uasyncio
async def count():
global myCounter
global myLock
for i in range(1000):
async with myLock:
temp=myCounter+1
await uasyncio.sleep(0)
myCounter=temp
async def main():
await uasyncio.gather(count(),count())
print(myCounter)
myCounter=0
myLock=uasyncio.Lock()
uasyncio.run(main())
Page 324
import uasyncio
from machine import Pin
async def blink(led, period_ms):
while True:
led.on()
await uasyncio.sleep_ms(5)
led.off()
await uasyncio.sleep_ms(period_ms)
async def main(led1, led2):
uasyncio.create_task(blink(led1, 700))
uasyncio.create_task(blink(led2, 400))
await uasyncio.sleep_ms(10_000)
uasyncio.run(main(Pin(0,Pin.OUT), Pin(1,Pin.OUT)))
Page 325
import uasyncio
from machine import Pin
from time import sleep_ms
async def blink(led, period_ms):
while True:
led.on()
await uasyncio.sleep_ms(5)
led.off()
await uasyncio.sleep_ms(period_ms)
async def timewaste():
while True:
sleep_ms(10)
await uasyncio.sleep_ms(0)
async def main(led1, led2):
uasyncio.create_task(blink(led1, 700))
uasyncio.create_task(blink(led2, 400))
uasyncio.create_task(timewaste())
await uasyncio.sleep_ms(10_000)
Page 327
import uasyncio
from time import sleep_ms
from machine import Pin, Timer
import rp2
import network
def setup(country, ssid, key):
rp2.country(country)
wifi=network.WLAN(network.STA_IF)
wifi.active(True)
wifi.disconnect()
LED=Pin("LED", Pin.OUT)
LED.high()
timeout=20000
wifi.connect(ssid,key)
timer=Timer()
timer.init(period=200, mode=Timer.PERIODIC,
callback=lambda t:LED.toggle())
s=0
while timeout>0:
s=wifi.status()
if s==3 or s<0:
break
sleep_ms(100)
timeout=timeout-100
if(s<2):
timer.init(period=1000, mode=Timer.PERIODIC,
callback=lambda t:LED.toggle())
else:
timer.deinit()
LED.high()
return wifi
async def main():
reader,writer= await uasyncio.open_connection("www.example.com",80)
request = b"GET /index.html HTTP/1.1\r\nHost:example.org\r\n\r\n"
writer.write(request)
await writer.drain()
print(await reader.read(512))
reader.close()
wifi = setup(country, ssid, key)
uasyncio.run(main())
Page 330
import uasyncio
import network
import rp2
from machine import Pin, Timer
from time import sleep_ms
import onewire
import ds18x20
def setup(country, ssid, key):
rp2.country(country)
wifi=network.WLAN(network.STA_IF)
wifi.active(True)
wifi.disconnect()
LED=Pin("LED", Pin.OUT)
LED.high()
timeout=20000
wifi.connect(ssid,key)
timer=Timer()
timer.init(period=200, mode=Timer.PERIODIC, callback=lambda t:LED.toggle())
s=0
while timeout>0:
s=wifi.status()
if s==3 or s<0:
break
sleep_ms(100)
timeout=timeout-100
if(s<2):
timer.init(period=1000, mode=Timer.PERIODIC,callback=lambda t:LED.toggle())
else:
timer.deinit()
LED.high()
return wifi
wifi = setup("country", "ssid", "key")
ow = onewire.OneWire(Pin(22))
presence = ow.reset()
if presence:
print("Device present")
else:
print("No device")
DS = ds18x20.DS18X20(ow)
roms = DS.scan()
template = """<!DOCTYPE html>
<html>
<head> <title>Temperature</title> </head>
<body> <h1>Current Temperature</h1>
Hello Pico W Server World <br/>
The Temperature is: <!--#temp--><br/>
</body>
</html>
"""
async def serve_client(reader,writer):
print("client")
print(await reader.read(512))
DS.convert_temp()
temp = DS.read_temp(roms[0])
html=template.replace("<!--#temp-->",str(temp))
headers = ("HTTP/1.1 200 OK\r\n"
"Content-Type: text/html; charset=UTF-8\r\n"
"Server:Pico\r\n"
f"Content-Length:{len(html)}\r\n\r\n"
)
buf = headers.encode("utf-8")+html.encode("utf-8")
writer.write(buf)
await writer.drain()
writer.close()
await writer.wait_closed()
async def main():
await uasyncio.start_server(serve_client, '192.168.253.58', 80,backlog=5)
while True:
print("heartbeat")
await uasyncio.sleep(1)
uasyncio.run(main())